22319: Add replace_segments API.
[arvados.git] / sdk / go / arvados / fs_collection_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "math/rand"
16         "net/http"
17         "os"
18         "regexp"
19         "runtime"
20         "runtime/pprof"
21         "strings"
22         "sync"
23         "sync/atomic"
24         "testing"
25         "time"
26
27         check "gopkg.in/check.v1"
28 )
29
30 var _ = check.Suite(&CollectionFSSuite{})
31
32 type keepClientStub struct {
33         blocks      map[string][]byte
34         refreshable map[string]bool
35         reads       []string             // locators from ReadAt() calls
36         onWrite     func(bufcopy []byte) // called from WriteBlock, before acquiring lock
37         authToken   string               // client's auth token (used for signing locators)
38         sigkey      string               // blob signing key
39         sigttl      time.Duration        // blob signing ttl
40         sync.RWMutex
41 }
42
43 var errStub404 = errors.New("404 block not found")
44
45 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
46         kcs.Lock()
47         kcs.reads = append(kcs.reads, locator)
48         kcs.Unlock()
49         kcs.RLock()
50         defer kcs.RUnlock()
51         if err := VerifySignature(locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
52                 return 0, err
53         }
54         buf := kcs.blocks[locator[:32]]
55         if buf == nil {
56                 return 0, errStub404
57         }
58         return copy(p, buf[off:]), nil
59 }
60
61 func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
62         if opts.Data == nil {
63                 panic("oops, stub is not made for this")
64         }
65         locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
66         buf := make([]byte, len(opts.Data))
67         copy(buf, opts.Data)
68         if kcs.onWrite != nil {
69                 kcs.onWrite(buf)
70         }
71         for _, sc := range opts.StorageClasses {
72                 if sc != "default" {
73                         return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
74                 }
75         }
76         kcs.Lock()
77         defer kcs.Unlock()
78         kcs.blocks[locator[:32]] = buf
79         return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
80 }
81
82 var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
83
84 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
85         if strings.Contains(locator, "+A") {
86                 return locator, nil
87         }
88         kcs.Lock()
89         defer kcs.Unlock()
90         if strings.Contains(locator, "+R") {
91                 if len(locator) < 32 {
92                         return "", fmt.Errorf("bad locator: %q", locator)
93                 }
94                 if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
95                         return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
96                 }
97         }
98         locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
99         locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
100         return locator, nil
101 }
102
103 type CollectionFSSuite struct {
104         client *Client
105         coll   Collection
106         fs     CollectionFileSystem
107         kc     *keepClientStub
108 }
109
110 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
111         s.client = NewClientFromEnv()
112         s.client.AuthToken = fixtureActiveToken
113         err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
114         c.Assert(err, check.IsNil)
115         s.kc = &keepClientStub{
116                 blocks: map[string][]byte{
117                         "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
118                 },
119                 sigkey:    fixtureBlobSigningKey,
120                 sigttl:    fixtureBlobSigningTTL,
121                 authToken: fixtureActiveToken,
122         }
123         s.fs, err = s.coll.FileSystem(s.client, s.kc)
124         c.Assert(err, check.IsNil)
125 }
126
127 func (s *CollectionFSSuite) TestSyncNonCanonicalManifest(c *check.C) {
128         var coll Collection
129         err := s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
130         c.Assert(err, check.IsNil)
131         mtxt := strings.Replace(coll.ManifestText, "3:3:bar 0:3:foo", "0:3:foo 3:3:bar", -1)
132         c.Assert(mtxt, check.Not(check.Equals), coll.ManifestText)
133         err = s.client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
134                 "collection": map[string]interface{}{
135                         "manifest_text": mtxt}})
136         c.Assert(err, check.IsNil)
137         // In order for the rest of the test to work as intended, the API server
138         // needs to retain the file ordering we set manually. We check that here.
139         // We can't check `mtxt == coll.ManifestText` because the API server
140         // might've returned new block signatures if the GET and POST happened in
141         // different seconds.
142         expectPattern := `\./dir1 \S+ 0:3:foo 3:3:bar\n`
143         c.Assert(coll.ManifestText, check.Matches, expectPattern)
144
145         fs, err := coll.FileSystem(s.client, s.kc)
146         c.Assert(err, check.IsNil)
147         err = fs.Sync()
148         c.Check(err, check.IsNil)
149
150         // fs had no local changes, so Sync should not have saved
151         // anything back to the API/database. (If it did, we would see
152         // the manifest rewritten in canonical order.)
153         var saved Collection
154         err = s.client.RequestAndDecode(&saved, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
155         c.Assert(err, check.IsNil)
156         c.Check(saved.ManifestText, check.Matches, expectPattern)
157 }
158
159 func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
160         _, ok := s.fs.(http.FileSystem)
161         c.Check(ok, check.Equals, true)
162 }
163
164 func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
165         fs, err := (&Collection{
166                 StorageClassesDesired: []string{"unobtainium"},
167         }).FileSystem(s.client, s.kc)
168         c.Assert(err, check.IsNil)
169
170         f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
171         c.Assert(err, check.IsNil)
172         _, err = f.Write([]byte("food"))
173         c.Assert(err, check.IsNil)
174         err = f.Close()
175         c.Assert(err, check.IsNil)
176         _, err = fs.MarshalManifest(".")
177         c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
178 }
179
180 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
181         fs, err := (&Collection{
182                 ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
183         }).FileSystem(s.client, s.kc)
184         c.Assert(err, check.IsNil)
185
186         f, err := fs.Open("/foo:foo")
187         c.Assert(err, check.IsNil)
188
189         fis, err := f.Readdir(0)
190         c.Check(err, check.IsNil)
191         c.Check(len(fis), check.Equals, 1)
192         c.Check(fis[0].Name(), check.Equals, "bar:bar")
193 }
194
195 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
196         f, err := s.fs.Open("/dir1")
197         c.Assert(err, check.IsNil)
198
199         st, err := f.Stat()
200         c.Assert(err, check.IsNil)
201         c.Check(st.Size(), check.Equals, int64(2))
202         c.Check(st.IsDir(), check.Equals, true)
203
204         fis, err := f.Readdir(0)
205         c.Check(err, check.IsNil)
206         c.Check(len(fis), check.Equals, 2)
207         if len(fis) > 0 {
208                 c.Check(fis[0].Size(), check.Equals, int64(3))
209         }
210 }
211
212 func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
213         f, err := s.fs.Open("./dir1")
214         c.Assert(err, check.IsNil)
215
216         fis, err := f.Readdir(1)
217         c.Check(err, check.IsNil)
218         c.Check(len(fis), check.Equals, 1)
219         if len(fis) > 0 {
220                 c.Check(fis[0].Size(), check.Equals, int64(3))
221         }
222
223         fis, err = f.Readdir(1)
224         c.Check(err, check.IsNil)
225         c.Check(len(fis), check.Equals, 1)
226         if len(fis) > 0 {
227                 c.Check(fis[0].Size(), check.Equals, int64(3))
228         }
229
230         fis, err = f.Readdir(1)
231         c.Check(len(fis), check.Equals, 0)
232         c.Check(err, check.NotNil)
233         c.Check(err, check.Equals, io.EOF)
234
235         f, err = s.fs.Open("dir1")
236         c.Assert(err, check.IsNil)
237         fis, err = f.Readdir(1)
238         c.Check(len(fis), check.Equals, 1)
239         c.Assert(err, check.IsNil)
240         fis, err = f.Readdir(2)
241         c.Check(len(fis), check.Equals, 1)
242         c.Assert(err, check.IsNil)
243         fis, err = f.Readdir(2)
244         c.Check(len(fis), check.Equals, 0)
245         c.Assert(err, check.Equals, io.EOF)
246 }
247
248 func (s *CollectionFSSuite) TestPathMunge(c *check.C) {
249         for _, path := range []string{".", "/", "./", "///", "/../", "/./.."} {
250                 f, err := s.fs.Open(path)
251                 c.Assert(err, check.IsNil)
252
253                 st, err := f.Stat()
254                 c.Assert(err, check.IsNil)
255                 c.Check(st.Size(), check.Equals, int64(1))
256                 c.Check(st.IsDir(), check.Equals, true)
257         }
258         for _, path := range []string{"/dir1", "dir1", "./dir1", "///dir1//.//", "../dir1/../dir1/"} {
259                 c.Logf("%q", path)
260                 f, err := s.fs.Open(path)
261                 c.Assert(err, check.IsNil)
262
263                 st, err := f.Stat()
264                 c.Assert(err, check.IsNil)
265                 c.Check(st.Size(), check.Equals, int64(2))
266                 c.Check(st.IsDir(), check.Equals, true)
267         }
268 }
269
270 func (s *CollectionFSSuite) TestNotExist(c *check.C) {
271         for _, path := range []string{"/no", "no", "./no", "n/o", "/n/o"} {
272                 f, err := s.fs.Open(path)
273                 c.Assert(f, check.IsNil)
274                 c.Assert(err, check.NotNil)
275                 c.Assert(os.IsNotExist(err), check.Equals, true)
276         }
277 }
278
279 func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
280         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
281         c.Assert(err, check.IsNil)
282         st, err := f.Stat()
283         c.Assert(err, check.IsNil)
284         c.Check(st.Size(), check.Equals, int64(3))
285         n, err := f.Write([]byte("bar"))
286         c.Check(n, check.Equals, 0)
287         c.Check(err, check.Equals, ErrReadOnlyFile)
288 }
289
290 func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
291         f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
292         c.Assert(err, check.IsNil)
293         st, err := f.Stat()
294         c.Assert(err, check.IsNil)
295         c.Check(st.Size(), check.Equals, int64(0))
296
297         n, err := f.Write([]byte("bar"))
298         c.Check(n, check.Equals, 3)
299         c.Check(err, check.IsNil)
300
301         c.Check(f.Close(), check.IsNil)
302
303         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
304         c.Check(f, check.IsNil)
305         c.Assert(err, check.NotNil)
306
307         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
308         c.Assert(err, check.IsNil)
309         st, err = f.Stat()
310         c.Assert(err, check.IsNil)
311         c.Check(st.Size(), check.Equals, int64(3))
312
313         c.Check(f.Close(), check.IsNil)
314
315         m, err := s.fs.MarshalManifest(".")
316         c.Assert(err, check.IsNil)
317         c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
318 }
319
320 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
321         maxBlockSize = 8
322         defer func() { maxBlockSize = 2 << 26 }()
323
324         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
325         c.Assert(err, check.IsNil)
326         defer f.Close()
327         st, err := f.Stat()
328         c.Assert(err, check.IsNil)
329         c.Check(st.Size(), check.Equals, int64(3))
330
331         f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
332         c.Assert(err, check.IsNil)
333         defer f2.Close()
334
335         buf := make([]byte, 64)
336         n, err := f.Read(buf)
337         c.Check(n, check.Equals, 3)
338         c.Check(err, check.Equals, io.EOF)
339         c.Check(string(buf[:3]), check.DeepEquals, "foo")
340
341         pos, err := f.Seek(-2, io.SeekCurrent)
342         c.Check(pos, check.Equals, int64(1))
343         c.Check(err, check.IsNil)
344
345         // Split a storedExtent in two, and insert a memExtent
346         n, err = f.Write([]byte("*"))
347         c.Check(n, check.Equals, 1)
348         c.Check(err, check.IsNil)
349
350         pos, err = f.Seek(0, io.SeekCurrent)
351         c.Check(pos, check.Equals, int64(2))
352         c.Check(err, check.IsNil)
353
354         pos, err = f.Seek(0, io.SeekStart)
355         c.Check(pos, check.Equals, int64(0))
356         c.Check(err, check.IsNil)
357
358         rbuf, err := ioutil.ReadAll(f)
359         c.Check(len(rbuf), check.Equals, 3)
360         c.Check(err, check.IsNil)
361         c.Check(string(rbuf), check.Equals, "f*o")
362
363         // Write multiple blocks in one call
364         f.Seek(1, io.SeekStart)
365         n, err = f.Write([]byte("0123456789abcdefg"))
366         c.Check(n, check.Equals, 17)
367         c.Check(err, check.IsNil)
368         pos, err = f.Seek(0, io.SeekCurrent)
369         c.Check(pos, check.Equals, int64(18))
370         c.Check(err, check.IsNil)
371         pos, err = f.Seek(-18, io.SeekCurrent)
372         c.Check(pos, check.Equals, int64(0))
373         c.Check(err, check.IsNil)
374         n, err = io.ReadFull(f, buf)
375         c.Check(n, check.Equals, 18)
376         c.Check(err, check.Equals, io.ErrUnexpectedEOF)
377         c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
378
379         buf2, err := ioutil.ReadAll(f2)
380         c.Check(err, check.IsNil)
381         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
382
383         // truncate to current size
384         err = f.Truncate(18)
385         c.Check(err, check.IsNil)
386         f2.Seek(0, io.SeekStart)
387         buf2, err = ioutil.ReadAll(f2)
388         c.Check(err, check.IsNil)
389         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
390
391         // shrink to zero some data
392         f.Truncate(15)
393         f2.Seek(0, io.SeekStart)
394         buf2, err = ioutil.ReadAll(f2)
395         c.Check(err, check.IsNil)
396         c.Check(string(buf2), check.Equals, "f0123456789abcd")
397
398         // grow to partial block/extent
399         f.Truncate(20)
400         f2.Seek(0, io.SeekStart)
401         buf2, err = ioutil.ReadAll(f2)
402         c.Check(err, check.IsNil)
403         c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
404
405         f.Truncate(0)
406         f2.Seek(0, io.SeekStart)
407         f2.Write([]byte("12345678abcdefghijkl"))
408
409         // grow to block/extent boundary
410         f.Truncate(64)
411         f2.Seek(0, io.SeekStart)
412         buf2, err = ioutil.ReadAll(f2)
413         c.Check(err, check.IsNil)
414         c.Check(len(buf2), check.Equals, 64)
415         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
416
417         // shrink to block/extent boundary
418         err = f.Truncate(32)
419         c.Check(err, check.IsNil)
420         f2.Seek(0, io.SeekStart)
421         buf2, err = ioutil.ReadAll(f2)
422         c.Check(err, check.IsNil)
423         c.Check(len(buf2), check.Equals, 32)
424         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
425
426         // shrink to partial block/extent
427         err = f.Truncate(15)
428         c.Check(err, check.IsNil)
429         f2.Seek(0, io.SeekStart)
430         buf2, err = ioutil.ReadAll(f2)
431         c.Check(err, check.IsNil)
432         c.Check(string(buf2), check.Equals, "12345678abcdefg")
433         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
434
435         // Force flush to ensure the block "12345678" gets stored, so
436         // we know what to expect in the final manifest below.
437         _, err = s.fs.MarshalManifest(".")
438         c.Check(err, check.IsNil)
439
440         // Truncate to size=3 while f2's ptr is at 15
441         err = f.Truncate(3)
442         c.Check(err, check.IsNil)
443         buf2, err = ioutil.ReadAll(f2)
444         c.Check(err, check.IsNil)
445         c.Check(string(buf2), check.Equals, "")
446         f2.Seek(0, io.SeekStart)
447         buf2, err = ioutil.ReadAll(f2)
448         c.Check(err, check.IsNil)
449         c.Check(string(buf2), check.Equals, "123")
450         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
451
452         m, err := s.fs.MarshalManifest(".")
453         c.Check(err, check.IsNil)
454         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
455         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
456         c.Check(s.fs.Size(), check.Equals, int64(6))
457 }
458
459 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
460         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
461         c.Assert(err, check.IsNil)
462         f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
463         c.Assert(err, check.IsNil)
464         defer f.Close()
465
466         checkSize := func(size int64) {
467                 fi, err := f.Stat()
468                 c.Assert(err, check.IsNil)
469                 c.Check(fi.Size(), check.Equals, size)
470
471                 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
472                 c.Assert(err, check.IsNil)
473                 defer f.Close()
474                 fi, err = f.Stat()
475                 c.Check(err, check.IsNil)
476                 c.Check(fi.Size(), check.Equals, size)
477                 pos, err := f.Seek(0, io.SeekEnd)
478                 c.Check(err, check.IsNil)
479                 c.Check(pos, check.Equals, size)
480         }
481
482         f.Seek(2, io.SeekEnd)
483         checkSize(0)
484         f.Write([]byte{1})
485         checkSize(3)
486
487         f.Seek(2, io.SeekCurrent)
488         checkSize(3)
489         f.Write([]byte{})
490         checkSize(5)
491
492         f.Seek(8, io.SeekStart)
493         checkSize(5)
494         n, err := f.Read(make([]byte, 1))
495         c.Check(n, check.Equals, 0)
496         c.Check(err, check.Equals, io.EOF)
497         checkSize(5)
498         f.Write([]byte{1, 2, 3})
499         checkSize(11)
500 }
501
502 func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
503         foo := "foo"
504         bar := "bar"
505         hash := map[string]string{
506                 foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
507                 bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
508         }
509
510         fs, err := (&Collection{
511                 ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
512         }).FileSystem(s.client, s.kc)
513         c.Assert(err, check.IsNil)
514         manifest, err := fs.MarshalManifest(".")
515         c.Check(manifest, check.Equals, "")
516         c.Check(err, check.NotNil)
517
518         s.kc.refreshable = map[string]bool{hash[bar]: true}
519
520         for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
521                 fs, err = (&Collection{
522                         ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
523                 }).FileSystem(s.client, s.kc)
524                 c.Assert(err, check.IsNil)
525                 manifest, err := fs.MarshalManifest(".")
526                 c.Check(err, check.IsNil)
527                 // Both blocks should now have +A signatures.
528                 c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
529                 c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
530         }
531 }
532
533 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
534         maxBlockSize = 8
535         defer func() { maxBlockSize = 2 << 26 }()
536
537         var err error
538         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
539         c.Assert(err, check.IsNil)
540         for _, name := range []string{"foo", "bar", "baz"} {
541                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
542                 c.Assert(err, check.IsNil)
543                 f.Write([]byte(name))
544                 f.Close()
545         }
546
547         m, err := s.fs.MarshalManifest(".")
548         c.Check(err, check.IsNil)
549         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
550         c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
551 }
552
553 func (s *CollectionFSSuite) TestMkdir(c *check.C) {
554         err := s.fs.Mkdir("foo/bar", 0755)
555         c.Check(err, check.Equals, os.ErrNotExist)
556
557         f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
558         c.Check(err, check.Equals, os.ErrNotExist)
559
560         err = s.fs.Mkdir("foo", 0755)
561         c.Check(err, check.IsNil)
562
563         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
564         c.Check(err, check.IsNil)
565         if err == nil {
566                 defer f.Close()
567                 f.Write([]byte("foo"))
568         }
569
570         // mkdir fails if a file already exists with that name
571         err = s.fs.Mkdir("foo/bar", 0755)
572         c.Check(err, check.NotNil)
573
574         err = s.fs.Remove("foo/bar")
575         c.Check(err, check.IsNil)
576
577         // mkdir succeeds after the file is deleted
578         err = s.fs.Mkdir("foo/bar", 0755)
579         c.Check(err, check.IsNil)
580
581         // creating a file in a nonexistent subdir should still fail
582         f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
583         c.Check(err, check.Equals, os.ErrNotExist)
584
585         f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
586         c.Check(err, check.IsNil)
587         if err == nil {
588                 defer f.Close()
589                 f.Write([]byte("foo"))
590         }
591
592         // creating foo/bar as a regular file should fail
593         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
594         c.Check(err, check.NotNil)
595
596         // creating foo/bar as a directory should fail
597         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
598         c.Check(err, check.NotNil)
599         err = s.fs.Mkdir("foo/bar", 0755)
600         c.Check(err, check.NotNil)
601
602         m, err := s.fs.MarshalManifest(".")
603         c.Check(err, check.IsNil)
604         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
605         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
606 }
607
608 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
609         if testing.Short() {
610                 c.Skip("slow")
611         }
612
613         maxBlockSize = 8
614         defer func() { maxBlockSize = 1 << 26 }()
615
616         var wg sync.WaitGroup
617         for n := 0; n < 128; n++ {
618                 wg.Add(1)
619                 go func() {
620                         defer wg.Done()
621                         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
622                         c.Assert(err, check.IsNil)
623                         defer f.Close()
624                         for i := 0; i < 1024; i++ {
625                                 r := rand.Uint32()
626                                 switch {
627                                 case r%11 == 0:
628                                         _, err := s.fs.MarshalManifest(".")
629                                         c.Check(err, check.IsNil)
630                                 case r&3 == 0:
631                                         f.Truncate(int64(rand.Intn(64)))
632                                 case r&3 == 1:
633                                         f.Seek(int64(rand.Intn(64)), io.SeekStart)
634                                 case r&3 == 2:
635                                         _, err := f.Write([]byte("beep boop"))
636                                         c.Check(err, check.IsNil)
637                                 case r&3 == 3:
638                                         _, err := ioutil.ReadAll(f)
639                                         c.Check(err, check.IsNil)
640                                 }
641                         }
642                 }()
643         }
644         wg.Wait()
645
646         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
647         c.Assert(err, check.IsNil)
648         defer f.Close()
649         buf, err := ioutil.ReadAll(f)
650         c.Check(err, check.IsNil)
651         c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
652 }
653
654 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
655         maxBlockSize = 40
656         defer func() { maxBlockSize = 2 << 26 }()
657
658         var err error
659         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
660         c.Assert(err, check.IsNil)
661
662         const nfiles = 256
663         const ngoroutines = 256
664
665         var wg sync.WaitGroup
666         for n := 0; n < ngoroutines; n++ {
667                 wg.Add(1)
668                 go func(n int) {
669                         defer wg.Done()
670                         expect := make([]byte, 0, 64)
671                         wbytes := []byte("there's no simple explanation for anything important that any of us do")
672                         f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
673                         c.Assert(err, check.IsNil)
674                         defer f.Close()
675                         for i := 0; i < nfiles; i++ {
676                                 trunc := rand.Intn(65)
677                                 woff := rand.Intn(trunc + 1)
678                                 wbytes = wbytes[:rand.Intn(64-woff+1)]
679                                 for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
680                                         buf[i] = 0
681                                 }
682                                 expect = expect[:trunc]
683                                 if trunc < woff+len(wbytes) {
684                                         expect = expect[:woff+len(wbytes)]
685                                 }
686                                 copy(expect[woff:], wbytes)
687                                 f.Truncate(int64(trunc))
688                                 pos, err := f.Seek(int64(woff), io.SeekStart)
689                                 c.Check(pos, check.Equals, int64(woff))
690                                 c.Check(err, check.IsNil)
691                                 n, err := f.Write(wbytes)
692                                 c.Check(n, check.Equals, len(wbytes))
693                                 c.Check(err, check.IsNil)
694                                 pos, err = f.Seek(0, io.SeekStart)
695                                 c.Check(pos, check.Equals, int64(0))
696                                 c.Check(err, check.IsNil)
697                                 buf, err := ioutil.ReadAll(f)
698                                 c.Check(string(buf), check.Equals, string(expect))
699                                 c.Check(err, check.IsNil)
700                         }
701                 }(n)
702         }
703         wg.Wait()
704
705         for n := 0; n < ngoroutines; n++ {
706                 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
707                 c.Assert(err, check.IsNil)
708                 f.(*filehandle).inode.(*filenode).waitPrune()
709                 s.checkMemSize(c, f)
710                 defer f.Close()
711         }
712
713         root, err := s.fs.Open("/")
714         c.Assert(err, check.IsNil)
715         defer root.Close()
716         fi, err := root.Readdir(-1)
717         c.Check(err, check.IsNil)
718         c.Check(len(fi), check.Equals, nfiles)
719
720         _, err = s.fs.MarshalManifest(".")
721         c.Check(err, check.IsNil)
722         // TODO: check manifest content
723 }
724
725 func (s *CollectionFSSuite) TestRemove(c *check.C) {
726         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
727         c.Assert(err, check.IsNil)
728         err = fs.Mkdir("dir0", 0755)
729         c.Assert(err, check.IsNil)
730         err = fs.Mkdir("dir1", 0755)
731         c.Assert(err, check.IsNil)
732         err = fs.Mkdir("dir1/dir2", 0755)
733         c.Assert(err, check.IsNil)
734         err = fs.Mkdir("dir1/dir3", 0755)
735         c.Assert(err, check.IsNil)
736
737         err = fs.Remove("dir0")
738         c.Check(err, check.IsNil)
739         err = fs.Remove("dir0")
740         c.Check(err, check.Equals, os.ErrNotExist)
741
742         err = fs.Remove("dir1/dir2/.")
743         c.Check(err, check.Equals, ErrInvalidArgument)
744         err = fs.Remove("dir1/dir2/..")
745         c.Check(err, check.Equals, ErrInvalidArgument)
746         err = fs.Remove("dir1")
747         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
748         err = fs.Remove("dir1/dir2/../../../dir1")
749         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
750         err = fs.Remove("dir1/dir3/")
751         c.Check(err, check.IsNil)
752         err = fs.RemoveAll("dir1")
753         c.Check(err, check.IsNil)
754         err = fs.RemoveAll("dir1")
755         c.Check(err, check.IsNil)
756 }
757
758 func (s *CollectionFSSuite) TestRenameError(c *check.C) {
759         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
760         c.Assert(err, check.IsNil)
761         err = fs.Mkdir("first", 0755)
762         c.Assert(err, check.IsNil)
763         err = fs.Mkdir("first/second", 0755)
764         c.Assert(err, check.IsNil)
765         f, err := fs.OpenFile("first/second/file", os.O_CREATE|os.O_WRONLY, 0755)
766         c.Assert(err, check.IsNil)
767         f.Write([]byte{1, 2, 3, 4, 5})
768         f.Close()
769         err = fs.Rename("first", "first/second/third")
770         c.Check(err, check.Equals, ErrInvalidArgument)
771         err = fs.Rename("first", "first/third")
772         c.Check(err, check.Equals, ErrInvalidArgument)
773         err = fs.Rename("first/second", "second")
774         c.Check(err, check.IsNil)
775         f, err = fs.OpenFile("second/file", 0, 0)
776         c.Assert(err, check.IsNil)
777         data, err := ioutil.ReadAll(f)
778         c.Check(err, check.IsNil)
779         c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
780 }
781
782 func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
783         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
784         c.Assert(err, check.IsNil)
785         err = fs.Mkdir("foo", 0755)
786         c.Assert(err, check.IsNil)
787         err = fs.Mkdir("bar", 0755)
788         c.Assert(err, check.IsNil)
789         err = fs.Rename("bar", "baz")
790         c.Check(err, check.IsNil)
791         err = fs.Rename("foo", "baz")
792         c.Check(err, check.NotNil)
793         err = fs.Rename("foo", "baz/")
794         c.Check(err, check.IsNil)
795         err = fs.Rename("baz/foo", ".")
796         c.Check(err, check.Equals, ErrInvalidArgument)
797         err = fs.Rename("baz/foo/", ".")
798         c.Check(err, check.Equals, ErrInvalidArgument)
799 }
800
801 func (s *CollectionFSSuite) TestRename(c *check.C) {
802         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
803         c.Assert(err, check.IsNil)
804         const (
805                 outer = 16
806                 inner = 16
807         )
808         for i := 0; i < outer; i++ {
809                 err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
810                 c.Assert(err, check.IsNil)
811                 for j := 0; j < inner; j++ {
812                         err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
813                         c.Assert(err, check.IsNil)
814                         for _, fnm := range []string{
815                                 fmt.Sprintf("dir%d/file%d", i, j),
816                                 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
817                         } {
818                                 f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
819                                 c.Assert(err, check.IsNil)
820                                 _, err = f.Write([]byte("beep"))
821                                 c.Assert(err, check.IsNil)
822                                 f.Close()
823                         }
824                 }
825         }
826         var wg sync.WaitGroup
827         for i := 0; i < outer; i++ {
828                 for j := 0; j < inner; j++ {
829                         wg.Add(1)
830                         go func(i, j int) {
831                                 defer wg.Done()
832                                 oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
833                                 newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
834                                 _, err := fs.Open(newname)
835                                 c.Check(err, check.Equals, os.ErrNotExist)
836                                 err = fs.Rename(oldname, newname)
837                                 c.Check(err, check.IsNil)
838                                 f, err := fs.Open(newname)
839                                 c.Check(err, check.IsNil)
840                                 f.Close()
841                         }(i, j)
842
843                         wg.Add(1)
844                         go func(i, j int) {
845                                 defer wg.Done()
846                                 // oldname does not exist
847                                 err := fs.Rename(
848                                         fmt.Sprintf("dir%d/dir%d/missing", i, j),
849                                         fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
850                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
851
852                                 // newname parent dir does not exist
853                                 err = fs.Rename(
854                                         fmt.Sprintf("dir%d/dir%d", i, j),
855                                         fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
856                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
857
858                                 // oldname parent dir is a file
859                                 err = fs.Rename(
860                                         fmt.Sprintf("dir%d/file%d/patherror", i, j),
861                                         fmt.Sprintf("dir%d/irrelevant", i))
862                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
863
864                                 // newname parent dir is a file
865                                 err = fs.Rename(
866                                         fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
867                                         fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
868                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
869                         }(i, j)
870                 }
871         }
872         wg.Wait()
873
874         f, err := fs.OpenFile("dir1/newfile3", 0, 0)
875         c.Assert(err, check.IsNil)
876         c.Check(f.Size(), check.Equals, int64(4))
877         buf, err := ioutil.ReadAll(f)
878         c.Check(buf, check.DeepEquals, []byte("beep"))
879         c.Check(err, check.IsNil)
880         _, err = fs.Open("dir1/dir1/file1")
881         c.Check(err, check.Equals, os.ErrNotExist)
882 }
883
884 func (s *CollectionFSSuite) TestPersist(c *check.C) {
885         maxBlockSize = 1024
886         defer func() { maxBlockSize = 2 << 26 }()
887
888         var err error
889         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
890         c.Assert(err, check.IsNil)
891         err = s.fs.Mkdir("d:r", 0755)
892         c.Assert(err, check.IsNil)
893
894         expect := map[string][]byte{}
895
896         var wg sync.WaitGroup
897         for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
898                 buf := make([]byte, 500)
899                 rand.Read(buf)
900                 expect[name] = buf
901
902                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
903                 c.Assert(err, check.IsNil)
904                 // Note: we don't close the file until after the test
905                 // is done. Writes to unclosed files should persist.
906                 defer f.Close()
907
908                 wg.Add(1)
909                 go func() {
910                         defer wg.Done()
911                         for i := 0; i < len(buf); i += 5 {
912                                 _, err := f.Write(buf[i : i+5])
913                                 c.Assert(err, check.IsNil)
914                         }
915                 }()
916         }
917         wg.Wait()
918
919         m, err := s.fs.MarshalManifest(".")
920         c.Check(err, check.IsNil)
921         c.Logf("%q", m)
922
923         root, err := s.fs.Open("/")
924         c.Assert(err, check.IsNil)
925         defer root.Close()
926         fi, err := root.Readdir(-1)
927         c.Check(err, check.IsNil)
928         c.Check(len(fi), check.Equals, 4)
929
930         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
931         c.Assert(err, check.IsNil)
932
933         root, err = persisted.Open("/")
934         c.Assert(err, check.IsNil)
935         defer root.Close()
936         fi, err = root.Readdir(-1)
937         c.Check(err, check.IsNil)
938         c.Check(len(fi), check.Equals, 4)
939
940         for name, content := range expect {
941                 c.Logf("read %q", name)
942                 f, err := persisted.Open(name)
943                 c.Assert(err, check.IsNil)
944                 defer f.Close()
945                 buf, err := ioutil.ReadAll(f)
946                 c.Check(err, check.IsNil)
947                 c.Check(buf, check.DeepEquals, content)
948         }
949 }
950
951 func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
952         var err error
953         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
954         c.Assert(err, check.IsNil)
955         for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
956                 err = s.fs.Mkdir(name, 0755)
957                 c.Assert(err, check.IsNil)
958         }
959
960         expect := map[string][]byte{
961                 "0":                nil,
962                 "00":               {},
963                 "one":              {1},
964                 "dir/0":            nil,
965                 "dir/two":          {1, 2},
966                 "dir/zero":         nil,
967                 "dir/zerodir/zero": nil,
968                 "zero/zero/zero":   nil,
969         }
970         for name, data := range expect {
971                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
972                 c.Assert(err, check.IsNil)
973                 if data != nil {
974                         _, err := f.Write(data)
975                         c.Assert(err, check.IsNil)
976                 }
977                 f.Close()
978         }
979
980         m, err := s.fs.MarshalManifest(".")
981         c.Check(err, check.IsNil)
982         c.Logf("%q", m)
983
984         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
985         c.Assert(err, check.IsNil)
986
987         for name, data := range expect {
988                 _, err = persisted.Open("bogus-" + name)
989                 c.Check(err, check.NotNil)
990
991                 f, err := persisted.Open(name)
992                 c.Assert(err, check.IsNil)
993
994                 if data == nil {
995                         data = []byte{}
996                 }
997                 buf, err := ioutil.ReadAll(f)
998                 c.Check(err, check.IsNil)
999                 c.Check(buf, check.DeepEquals, data)
1000         }
1001
1002         expectDir := map[string]int{
1003                 "empty":           0,
1004                 "not empty":       1,
1005                 "not empty/empty": 0,
1006         }
1007         for name, expectLen := range expectDir {
1008                 _, err := persisted.Open(name + "/bogus")
1009                 c.Check(err, check.NotNil)
1010
1011                 d, err := persisted.Open(name)
1012                 defer d.Close()
1013                 c.Check(err, check.IsNil)
1014                 fi, err := d.Readdir(-1)
1015                 c.Check(err, check.IsNil)
1016                 c.Check(fi, check.HasLen, expectLen)
1017         }
1018 }
1019
1020 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
1021         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1022         c.Assert(err, check.IsNil)
1023
1024         f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
1025         c.Check(f, check.IsNil)
1026         c.Check(err, check.ErrorMatches, `file does not exist`)
1027
1028         f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
1029         c.Assert(err, check.IsNil)
1030         defer f.Close()
1031         n, err := f.Write([]byte{1, 2, 3})
1032         c.Check(n, check.Equals, 0)
1033         c.Check(err, check.ErrorMatches, `read-only file`)
1034         n, err = f.Read(make([]byte, 1))
1035         c.Check(n, check.Equals, 0)
1036         c.Check(err, check.Equals, io.EOF)
1037         f, err = fs.OpenFile("new", os.O_RDWR, 0)
1038         c.Assert(err, check.IsNil)
1039         defer f.Close()
1040         _, err = f.Write([]byte{4, 5, 6})
1041         c.Check(err, check.IsNil)
1042         fi, err := f.Stat()
1043         c.Assert(err, check.IsNil)
1044         c.Check(fi.Size(), check.Equals, int64(3))
1045
1046         f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
1047         c.Assert(err, check.IsNil)
1048         defer f.Close()
1049         pos, err := f.Seek(0, io.SeekEnd)
1050         c.Check(pos, check.Equals, int64(0))
1051         c.Check(err, check.IsNil)
1052         fi, err = f.Stat()
1053         c.Assert(err, check.IsNil)
1054         c.Check(fi.Size(), check.Equals, int64(0))
1055         fs.Remove("new")
1056
1057         buf := make([]byte, 64)
1058         f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
1059         c.Assert(err, check.IsNil)
1060         f.Write([]byte{1, 2, 3})
1061         f.Seek(0, io.SeekStart)
1062         n, _ = f.Read(buf[:1])
1063         c.Check(n, check.Equals, 1)
1064         c.Check(buf[:1], check.DeepEquals, []byte{1})
1065         pos, err = f.Seek(0, io.SeekCurrent)
1066         c.Assert(err, check.IsNil)
1067         c.Check(pos, check.Equals, int64(1))
1068         f.Write([]byte{4, 5, 6})
1069         pos, err = f.Seek(0, io.SeekCurrent)
1070         c.Assert(err, check.IsNil)
1071         c.Check(pos, check.Equals, int64(6))
1072         f.Seek(0, io.SeekStart)
1073         n, err = f.Read(buf)
1074         c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
1075         c.Check(err, check.Equals, io.EOF)
1076         f.Close()
1077
1078         f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
1079         c.Assert(err, check.IsNil)
1080         pos, err = f.Seek(0, io.SeekCurrent)
1081         c.Check(pos, check.Equals, int64(0))
1082         c.Check(err, check.IsNil)
1083         f.Read(buf[:3])
1084         pos, _ = f.Seek(0, io.SeekCurrent)
1085         c.Check(pos, check.Equals, int64(3))
1086         f.Write([]byte{7, 8, 9})
1087         pos, err = f.Seek(0, io.SeekCurrent)
1088         c.Check(err, check.IsNil)
1089         c.Check(pos, check.Equals, int64(9))
1090         f.Close()
1091
1092         f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
1093         c.Assert(err, check.IsNil)
1094         n, err = f.Write([]byte{3, 2, 1})
1095         c.Check(n, check.Equals, 3)
1096         c.Check(err, check.IsNil)
1097         pos, _ = f.Seek(0, io.SeekCurrent)
1098         c.Check(pos, check.Equals, int64(3))
1099         pos, _ = f.Seek(0, io.SeekStart)
1100         c.Check(pos, check.Equals, int64(0))
1101         n, err = f.Read(buf)
1102         c.Check(n, check.Equals, 0)
1103         c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
1104         f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
1105         c.Assert(err, check.IsNil)
1106         n, _ = f.Read(buf)
1107         c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
1108
1109         f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
1110         c.Check(f, check.IsNil)
1111         c.Check(err, check.NotNil)
1112
1113         f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
1114         c.Check(f, check.IsNil)
1115         c.Check(err, check.ErrorMatches, `invalid flag.*`)
1116 }
1117
1118 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
1119         defer func(cw, mbs int) {
1120                 concurrentWriters = cw
1121                 maxBlockSize = mbs
1122         }(concurrentWriters, maxBlockSize)
1123         concurrentWriters = 2
1124         maxBlockSize = 1024
1125
1126         proceed := make(chan struct{})
1127         var started, concurrent int32
1128         blk2done := false
1129         s.kc.onWrite = func([]byte) {
1130                 atomic.AddInt32(&concurrent, 1)
1131                 switch atomic.AddInt32(&started, 1) {
1132                 case 1:
1133                         // Wait until block 2 starts and finishes, and block 3 starts
1134                         select {
1135                         case <-proceed:
1136                                 c.Check(blk2done, check.Equals, true)
1137                         case <-time.After(time.Second):
1138                                 c.Error("timed out")
1139                         }
1140                 case 2:
1141                         time.Sleep(time.Millisecond)
1142                         blk2done = true
1143                 case 3:
1144                         close(proceed)
1145                 default:
1146                         time.Sleep(time.Millisecond)
1147                 }
1148                 c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
1149         }
1150
1151         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1152         c.Assert(err, check.IsNil)
1153         f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
1154         c.Assert(err, check.IsNil)
1155         defer f.Close()
1156
1157         data := make([]byte, 500)
1158         rand.Read(data)
1159
1160         for i := 0; i < 100; i++ {
1161                 n, err := f.Write(data)
1162                 c.Assert(n, check.Equals, len(data))
1163                 c.Assert(err, check.IsNil)
1164         }
1165
1166         currentMemExtents := func() (memExtents []int) {
1167                 for idx, e := range f.(*filehandle).inode.(*filenode).segments {
1168                         switch e.(type) {
1169                         case *memSegment:
1170                                 memExtents = append(memExtents, idx)
1171                         }
1172                 }
1173                 return
1174         }
1175         f.(*filehandle).inode.(*filenode).waitPrune()
1176         c.Check(currentMemExtents(), check.HasLen, 1)
1177
1178         m, err := fs.MarshalManifest(".")
1179         c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
1180         c.Check(err, check.IsNil)
1181         c.Check(currentMemExtents(), check.HasLen, 0)
1182 }
1183
1184 // Ensure blocks get flushed to disk if a lot of data is written to
1185 // small files/directories without calling sync().
1186 //
1187 // Write four 512KiB files into each of 256 top-level dirs (total
1188 // 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
1189 // exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
1190 // 2MiB).
1191 func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
1192         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1193         c.Assert(err, check.IsNil)
1194
1195         s.kc.onWrite = func([]byte) {
1196                 // discard flushed data -- otherwise the stub will use
1197                 // unlimited memory
1198                 time.Sleep(time.Millisecond)
1199                 s.kc.Lock()
1200                 defer s.kc.Unlock()
1201                 s.kc.blocks = map[string][]byte{}
1202         }
1203         for i := 0; i < 256; i++ {
1204                 buf := bytes.NewBuffer(make([]byte, 524288))
1205                 fmt.Fprintf(buf, "test file in dir%d", i)
1206
1207                 dir := fmt.Sprintf("dir%d", i)
1208                 fs.Mkdir(dir, 0755)
1209                 for j := 0; j < 2; j++ {
1210                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1211                         c.Assert(err, check.IsNil)
1212                         defer f.Close()
1213                         _, err = io.Copy(f, buf)
1214                         c.Assert(err, check.IsNil)
1215                 }
1216
1217                 if i%8 == 0 {
1218                         fs.Flush("", true)
1219                 }
1220
1221                 size := fs.MemorySize()
1222                 if !c.Check(size <= 1<<24, check.Equals, true) {
1223                         c.Logf("at dir%d fs.MemorySize()=%d", i, size)
1224                         return
1225                 }
1226         }
1227 }
1228
1229 // Ensure short blocks at the end of a stream don't get flushed by
1230 // Flush(false).
1231 //
1232 // Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
1233 // blocks have been flushed while 8x 3MiB is still buffered in memory.
1234 func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
1235         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1236         c.Assert(err, check.IsNil)
1237
1238         var flushed int64
1239         s.kc.onWrite = func(p []byte) {
1240                 atomic.AddInt64(&flushed, int64(len(p)))
1241         }
1242
1243         nDirs := int64(8)
1244         nFiles := int64(67)
1245         megabyte := make([]byte, 1<<20)
1246         for i := int64(0); i < nDirs; i++ {
1247                 dir := fmt.Sprintf("dir%d", i)
1248                 fs.Mkdir(dir, 0755)
1249                 for j := int64(0); j < nFiles; j++ {
1250                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1251                         c.Assert(err, check.IsNil)
1252                         defer f.Close()
1253                         _, err = f.Write(megabyte)
1254                         c.Assert(err, check.IsNil)
1255                 }
1256         }
1257         inodebytes := int64((nDirs*(nFiles+1) + 1) * 64)
1258         c.Check(fs.MemorySize(), check.Equals, nDirs*nFiles*(1<<20+64)+inodebytes)
1259         c.Check(flushed, check.Equals, int64(0))
1260
1261         waitForFlush := func(expectUnflushed, expectFlushed int64) {
1262                 for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
1263                 }
1264                 c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
1265                 c.Check(flushed, check.Equals, expectFlushed)
1266         }
1267
1268         // Nothing flushed yet
1269         waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
1270
1271         // Flushing a non-empty dir "/" is non-recursive and there are
1272         // no top-level files, so this has no effect
1273         fs.Flush("/", false)
1274         waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
1275
1276         // Flush the full block in dir0
1277         fs.Flush("dir0", false)
1278         bigloclen := int64(32 + 9 + 51 + 64) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
1279         waitForFlush((nDirs*nFiles-64)*(1<<20+64)+inodebytes+bigloclen*64, 64<<20)
1280
1281         err = fs.Flush("dir-does-not-exist", false)
1282         c.Check(err, check.NotNil)
1283
1284         // Flush full blocks in all dirs
1285         fs.Flush("", false)
1286         waitForFlush(nDirs*3*(1<<20+64)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
1287
1288         // Flush non-full blocks, too
1289         fs.Flush("", true)
1290         smallloclen := int64(32 + 8 + 51 + 64) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
1291         waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
1292 }
1293
1294 // Even when writing lots of files/dirs from different goroutines, as
1295 // long as Flush(dir,false) is called after writing each file,
1296 // unflushed data should be limited to one full block per
1297 // concurrentWriter, plus one nearly-full block at the end of each
1298 // dir/stream.
1299 func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
1300         nDirs := int64(8)
1301         maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
1302
1303         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1304         c.Assert(err, check.IsNil)
1305
1306         release := make(chan struct{})
1307         timeout := make(chan struct{})
1308         time.AfterFunc(10*time.Second, func() { close(timeout) })
1309         var putCount, concurrency int64
1310         var unflushed int64
1311         s.kc.onWrite = func(p []byte) {
1312                 defer atomic.AddInt64(&unflushed, -int64(len(p)))
1313                 cur := atomic.AddInt64(&concurrency, 1)
1314                 defer atomic.AddInt64(&concurrency, -1)
1315                 pc := atomic.AddInt64(&putCount, 1)
1316                 if pc < int64(concurrentWriters) {
1317                         // Block until we reach concurrentWriters, to
1318                         // make sure we're really accepting concurrent
1319                         // writes.
1320                         select {
1321                         case <-release:
1322                         case <-timeout:
1323                                 c.Error("timeout")
1324                         }
1325                 } else if pc == int64(concurrentWriters) {
1326                         // Unblock the first N-1 PUT reqs.
1327                         close(release)
1328                 }
1329                 c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
1330                 c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
1331         }
1332
1333         var owg sync.WaitGroup
1334         megabyte := make([]byte, 1<<20)
1335         for i := int64(0); i < nDirs; i++ {
1336                 dir := fmt.Sprintf("dir%d", i)
1337                 fs.Mkdir(dir, 0755)
1338                 owg.Add(1)
1339                 go func() {
1340                         defer owg.Done()
1341                         defer fs.Flush(dir, true)
1342                         var iwg sync.WaitGroup
1343                         defer iwg.Wait()
1344                         for j := 0; j < 67; j++ {
1345                                 iwg.Add(1)
1346                                 go func(j int) {
1347                                         defer iwg.Done()
1348                                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1349                                         c.Assert(err, check.IsNil)
1350                                         defer f.Close()
1351                                         n, err := f.Write(megabyte)
1352                                         c.Assert(err, check.IsNil)
1353                                         atomic.AddInt64(&unflushed, int64(n))
1354                                         fs.Flush(dir, false)
1355                                 }(j)
1356                         }
1357                 }()
1358         }
1359         owg.Wait()
1360         fs.Flush("", true)
1361 }
1362
1363 func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
1364         done := false
1365         defer func() { done = true }()
1366         time.AfterFunc(10*time.Second, func() {
1367                 if !done {
1368                         pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
1369                         panic("timeout")
1370                 }
1371         })
1372
1373         wrote := 0
1374         s.kc.onWrite = func(p []byte) {
1375                 s.kc.Lock()
1376                 s.kc.blocks = map[string][]byte{}
1377                 wrote++
1378                 defer c.Logf("wrote block %d, %d bytes", wrote, len(p))
1379                 s.kc.Unlock()
1380                 time.Sleep(20 * time.Millisecond)
1381         }
1382
1383         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1384         c.Assert(err, check.IsNil)
1385
1386         data := make([]byte, 1<<20)
1387         for i := 0; i < 3; i++ {
1388                 dir := fmt.Sprintf("dir%d", i)
1389                 fs.Mkdir(dir, 0755)
1390                 for j := 0; j < 200; j++ {
1391                         data[0] = byte(j)
1392                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1393                         c.Assert(err, check.IsNil)
1394                         _, err = f.Write(data)
1395                         c.Assert(err, check.IsNil)
1396                         f.Close()
1397                         fs.Flush(dir, false)
1398                 }
1399                 _, err := fs.MarshalManifest(".")
1400                 c.Check(err, check.IsNil)
1401         }
1402 }
1403
1404 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
1405         s.kc.onWrite = func([]byte) {
1406                 s.kc.Lock()
1407                 s.kc.blocks = map[string][]byte{}
1408                 s.kc.Unlock()
1409         }
1410         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1411         c.Assert(err, check.IsNil)
1412         for _, blocksize := range []int{8, 1000000} {
1413                 dir := fmt.Sprintf("dir%d", blocksize)
1414                 err = fs.Mkdir(dir, 0755)
1415                 c.Assert(err, check.IsNil)
1416                 data := make([]byte, blocksize)
1417                 for i := 0; i < 100; i++ {
1418                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
1419                         c.Assert(err, check.IsNil)
1420                         _, err = f.Write(data)
1421                         c.Assert(err, check.IsNil)
1422                         f.Close()
1423                         fs.Flush(dir, false)
1424                 }
1425                 fs.Flush(dir, true)
1426                 _, err := fs.MarshalManifest(".")
1427                 c.Check(err, check.IsNil)
1428         }
1429 }
1430
1431 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
1432         for _, txt := range []string{
1433                 "\n",
1434                 ".\n",
1435                 ". \n",
1436                 ". d41d8cd98f00b204e9800998ecf8427e+0\n",
1437                 ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
1438                 ". 0:0:foo\n",
1439                 ".  0:0:foo\n",
1440                 ". 0:0:foo 0:0:bar\n",
1441                 ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
1442                 ". d41d8cd98f00b204e9800998ecf8427e+0 :0:0:foo\n",
1443                 ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
1444                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
1445                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
1446                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
1447                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
1448                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
1449                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
1450                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
1451                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
1452                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
1453                 "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
1454         } {
1455                 c.Logf("<-%q", txt)
1456                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1457                 c.Check(fs, check.IsNil)
1458                 c.Logf("-> %s", err)
1459                 c.Check(err, check.NotNil)
1460         }
1461 }
1462
1463 func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
1464         for _, txt := range []string{
1465                 "",
1466                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
1467                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
1468                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
1469                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
1470                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
1471                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
1472         } {
1473                 c.Logf("<-%q", txt)
1474                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1475                 c.Check(err, check.IsNil)
1476                 c.Check(fs, check.NotNil)
1477         }
1478 }
1479
1480 var fakeLocator = func() []string {
1481         locs := make([]string, 10)
1482         for i := range locs {
1483                 locs[i] = fmt.Sprintf("%x+%d", md5.Sum(make([]byte, i)), i)
1484                 if i%2 == 1 {
1485                         locs[i] += "+Awhatever+Zotherhints"
1486                 }
1487         }
1488         return locs
1489 }()
1490
1491 func (s *CollectionFSSuite) TestReplaceSegments_HappyPath(c *check.C) {
1492         fs, err := (&Collection{
1493                 ManifestText: ". " + fakeLocator[1] + " " + fakeLocator[2] + " 0:3:file3\n",
1494         }).FileSystem(nil, &keepClientStub{})
1495         c.Assert(err, check.IsNil)
1496         changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1497                 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[3], 0, 1},
1498                 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[3], 1, 2},
1499         })
1500         c.Check(changed, check.Equals, true)
1501         c.Check(err, check.IsNil)
1502         mtxt, err := fs.MarshalManifest(".")
1503         c.Check(err, check.IsNil)
1504         c.Check(mtxt, check.Equals, ". "+fakeLocator[3]+" 0:3:file3\n")
1505 }
1506
1507 func (s *CollectionFSSuite) TestReplaceSegments_InvalidOffset(c *check.C) {
1508         origtxt := ". " + fakeLocator[1] + " " + fakeLocator[2] + " 0:3:file3\n"
1509         fs, err := (&Collection{
1510                 ManifestText: origtxt,
1511         }).FileSystem(nil, &keepClientStub{})
1512         c.Assert(err, check.IsNil)
1513         changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1514                 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[3], 0, 1},
1515                 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[3], 2, 2},
1516         })
1517         c.Check(changed, check.Equals, false)
1518         c.Check(err, check.ErrorMatches, `invalid replacement: offset 2 \+ length 2 > block size 3`)
1519         mtxt, err := fs.MarshalManifest(".")
1520         c.Check(err, check.IsNil)
1521         c.Check(mtxt, check.Equals, origtxt)
1522 }
1523
1524 func (s *CollectionFSSuite) TestReplaceSegments_LengthMismatch(c *check.C) {
1525         origtxt := ". " + fakeLocator[1] + " " + fakeLocator[2] + " 0:3:file3\n"
1526         fs, err := (&Collection{
1527                 ManifestText: origtxt,
1528         }).FileSystem(nil, &keepClientStub{})
1529         c.Assert(err, check.IsNil)
1530         changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1531                 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[3], 0, 1},
1532                 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[3], 0, 3},
1533         })
1534         c.Check(changed, check.Equals, false)
1535         c.Check(err, check.ErrorMatches, `mismatched length: replacing segment length 2 with segment length 3`)
1536         mtxt, err := fs.MarshalManifest(".")
1537         c.Check(err, check.IsNil)
1538         c.Check(mtxt, check.Equals, origtxt)
1539 }
1540
1541 func (s *CollectionFSSuite) TestReplaceSegments_SkipUnreferenced(c *check.C) {
1542         fs, err := (&Collection{
1543                 ManifestText: ". " + fakeLocator[1] + " " + fakeLocator[2] + " " + fakeLocator[3] + " 0:6:file6\n",
1544         }).FileSystem(nil, &keepClientStub{})
1545         c.Assert(err, check.IsNil)
1546         changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1547                 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[4], 0, 1}, // skipped because [5] unref
1548                 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[4], 1, 2}, // skipped because [5] unref
1549                 BlockSegment{fakeLocator[5], 0, 2}: BlockSegment{fakeLocator[4], 1, 2}, // [5] unreferenced in orig manifest
1550                 BlockSegment{fakeLocator[3], 0, 3}: BlockSegment{fakeLocator[6], 3, 3}, // applied
1551         })
1552         c.Check(changed, check.Equals, true)
1553         c.Check(err, check.IsNil)
1554         mtxt, err := fs.MarshalManifest(".")
1555         c.Check(err, check.IsNil)
1556         c.Check(mtxt, check.Equals, ". "+fakeLocator[1]+" "+fakeLocator[2]+" "+fakeLocator[6]+" 0:3:file6 6:3:file6\n")
1557 }
1558
1559 func (s *CollectionFSSuite) TestReplaceSegments_SkipIncompleteSegment(c *check.C) {
1560         origtxt := ". " + fakeLocator[2] + " " + fakeLocator[3] + " 0:5:file5\n"
1561         fs, err := (&Collection{
1562                 ManifestText: origtxt,
1563         }).FileSystem(nil, &keepClientStub{})
1564         c.Assert(err, check.IsNil)
1565         changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1566                 BlockSegment{fakeLocator[2], 0, 1}: BlockSegment{fakeLocator[4], 0, 1}, // length=1 does not match the length=2 segment
1567         })
1568         c.Check(changed, check.Equals, false)
1569         c.Check(err, check.IsNil)
1570         mtxt, err := fs.MarshalManifest(".")
1571         c.Check(err, check.IsNil)
1572         c.Check(mtxt, check.Equals, origtxt)
1573 }
1574
1575 func (s *CollectionFSSuite) TestSnapshotSplice(c *check.C) {
1576         filedata1 := "hello snapshot+splice world\n"
1577         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1578         c.Assert(err, check.IsNil)
1579         {
1580                 f, err := fs.OpenFile("file1", os.O_CREATE|os.O_RDWR, 0700)
1581                 c.Assert(err, check.IsNil)
1582                 _, err = f.Write([]byte(filedata1))
1583                 c.Assert(err, check.IsNil)
1584                 err = f.Close()
1585                 c.Assert(err, check.IsNil)
1586         }
1587
1588         snap, err := Snapshot(fs, "/")
1589         c.Assert(err, check.IsNil)
1590         err = Splice(fs, "dir1", snap)
1591         c.Assert(err, check.IsNil)
1592         f, err := fs.Open("dir1/file1")
1593         c.Assert(err, check.IsNil)
1594         buf, err := io.ReadAll(f)
1595         c.Assert(err, check.IsNil)
1596         c.Check(string(buf), check.Equals, filedata1)
1597 }
1598
1599 func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
1600         filedata1 := "hello refresh signatures world\n"
1601         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1602         c.Assert(err, check.IsNil)
1603         fs.Mkdir("d1", 0700)
1604         f, err := fs.OpenFile("d1/file1", os.O_CREATE|os.O_RDWR, 0700)
1605         c.Assert(err, check.IsNil)
1606         _, err = f.Write([]byte(filedata1))
1607         c.Assert(err, check.IsNil)
1608         err = f.Close()
1609         c.Assert(err, check.IsNil)
1610
1611         filedata2 := "hello refresh signatures universe\n"
1612         fs.Mkdir("d2", 0700)
1613         f, err = fs.OpenFile("d2/file2", os.O_CREATE|os.O_RDWR, 0700)
1614         c.Assert(err, check.IsNil)
1615         _, err = f.Write([]byte(filedata2))
1616         c.Assert(err, check.IsNil)
1617         err = f.Close()
1618         c.Assert(err, check.IsNil)
1619         txt, err := fs.MarshalManifest(".")
1620         c.Assert(err, check.IsNil)
1621         var saved Collection
1622         err = s.client.RequestAndDecode(&saved, "POST", "arvados/v1/collections", nil, map[string]interface{}{
1623                 "select": []string{"manifest_text", "uuid", "portable_data_hash"},
1624                 "collection": map[string]interface{}{
1625                         "manifest_text": txt,
1626                 },
1627         })
1628         c.Assert(err, check.IsNil)
1629
1630         // Update signatures synchronously if they are already expired
1631         // when Read() is called.
1632         {
1633                 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, time.Now().Add(-2*time.Second), s.kc.sigttl, []byte(s.kc.sigkey))
1634                 fs, err := saved.FileSystem(s.client, s.kc)
1635                 c.Assert(err, check.IsNil)
1636                 f, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
1637                 c.Assert(err, check.IsNil)
1638                 buf, err := ioutil.ReadAll(f)
1639                 c.Check(err, check.IsNil)
1640                 c.Check(string(buf), check.Equals, filedata1)
1641         }
1642
1643         // Update signatures asynchronously if we're more than half
1644         // way to TTL when Read() is called.
1645         {
1646                 exp := time.Now().Add(2 * time.Minute)
1647                 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, exp, s.kc.sigttl, []byte(s.kc.sigkey))
1648                 fs, err := saved.FileSystem(s.client, s.kc)
1649                 c.Assert(err, check.IsNil)
1650                 f1, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
1651                 c.Assert(err, check.IsNil)
1652                 f2, err := fs.OpenFile("d2/file2", os.O_RDONLY, 0)
1653                 c.Assert(err, check.IsNil)
1654                 buf, err := ioutil.ReadAll(f1)
1655                 c.Check(err, check.IsNil)
1656                 c.Check(string(buf), check.Equals, filedata1)
1657
1658                 // Ensure fs treats the 2-minute TTL as less than half
1659                 // the server's signing TTL. If we don't do this,
1660                 // collectionfs will guess the signature is fresh,
1661                 // i.e., signing TTL is 2 minutes, and won't do an
1662                 // async refresh.
1663                 fs.(*collectionFileSystem).guessSignatureTTL = time.Hour
1664
1665                 refreshed := false
1666                 for deadline := time.Now().Add(time.Second * 10); time.Now().Before(deadline) && !refreshed; time.Sleep(time.Second / 10) {
1667                         _, err = f1.Seek(0, io.SeekStart)
1668                         c.Assert(err, check.IsNil)
1669                         buf, err = ioutil.ReadAll(f1)
1670                         c.Assert(err, check.IsNil)
1671                         c.Assert(string(buf), check.Equals, filedata1)
1672                         loc := s.kc.reads[len(s.kc.reads)-1]
1673                         t, err := signatureExpiryTime(loc)
1674                         c.Assert(err, check.IsNil)
1675                         c.Logf("last read block %s had signature expiry time %v", loc, t)
1676                         if t.Sub(time.Now()) > time.Hour {
1677                                 refreshed = true
1678                         }
1679                 }
1680                 c.Check(refreshed, check.Equals, true)
1681
1682                 // Second locator should have been updated at the same
1683                 // time.
1684                 buf, err = ioutil.ReadAll(f2)
1685                 c.Assert(err, check.IsNil)
1686                 c.Assert(string(buf), check.Equals, filedata2)
1687                 loc := s.kc.reads[len(s.kc.reads)-1]
1688                 c.Check(loc, check.Not(check.Equals), s.kc.reads[len(s.kc.reads)-2])
1689                 t, err := signatureExpiryTime(s.kc.reads[len(s.kc.reads)-1])
1690                 c.Assert(err, check.IsNil)
1691                 c.Logf("last read block %s had signature expiry time %v", loc, t)
1692                 c.Check(t.Sub(time.Now()) > time.Hour, check.Equals, true)
1693         }
1694 }
1695
1696 var bigmanifest = func() string {
1697         var buf bytes.Buffer
1698         for i := 0; i < 2000; i++ {
1699                 fmt.Fprintf(&buf, "./dir%d", i)
1700                 for i := 0; i < 100; i++ {
1701                         fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
1702                 }
1703                 for i := 0; i < 2000; i++ {
1704                         fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
1705                 }
1706                 fmt.Fprintf(&buf, "\n")
1707         }
1708         return buf.String()
1709 }()
1710
1711 func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
1712         DebugLocksPanicMode = false
1713         c.Logf("test manifest is %d bytes", len(bigmanifest))
1714         for i := 0; i < c.N; i++ {
1715                 fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
1716                 c.Check(err, check.IsNil)
1717                 c.Check(fs, check.NotNil)
1718         }
1719 }
1720
1721 func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
1722         fn := f.(*filehandle).inode.(*filenode)
1723         var memsize int64
1724         for _, seg := range fn.segments {
1725                 if e, ok := seg.(*memSegment); ok {
1726                         memsize += int64(len(e.buf))
1727                 }
1728         }
1729         c.Check(fn.memsize, check.Equals, memsize)
1730 }
1731
1732 type CollectionFSUnitSuite struct{}
1733
1734 var _ = check.Suite(&CollectionFSUnitSuite{})
1735
1736 // expect ~2 seconds to load a manifest with 256K files
1737 func (s *CollectionFSUnitSuite) TestLargeManifest_ManyFiles(c *check.C) {
1738         if testing.Short() {
1739                 c.Skip("slow")
1740         }
1741         s.testLargeManifest(c, 512, 512, 1, 0)
1742 }
1743
1744 func (s *CollectionFSUnitSuite) TestLargeManifest_LargeFiles(c *check.C) {
1745         if testing.Short() {
1746                 c.Skip("slow")
1747         }
1748         s.testLargeManifest(c, 1, 800, 1000, 0)
1749 }
1750
1751 func (s *CollectionFSUnitSuite) TestLargeManifest_InterleavedFiles(c *check.C) {
1752         if testing.Short() {
1753                 c.Skip("slow")
1754         }
1755         // Timing figures here are from a dev host, (0)->(1)->(2)->(3)
1756         // (0) no optimizations (main branch commit ea697fb1e8)
1757         // (1) resolve streampos->blkidx with binary search
1758         // (2) ...and rewrite PortableDataHash() without regexp
1759         // (3) ...and use fnodeCache in loadManifest
1760         s.testLargeManifest(c, 1, 800, 100, 4<<20) // 127s    -> 12s  -> 2.5s -> 1.5s
1761         s.testLargeManifest(c, 1, 50, 1000, 4<<20) // 44s     -> 10s  -> 1.5s -> 0.8s
1762         s.testLargeManifest(c, 1, 200, 100, 4<<20) // 13s     -> 4s   -> 0.6s -> 0.3s
1763         s.testLargeManifest(c, 1, 200, 150, 4<<20) // 26s     -> 4s   -> 1s   -> 0.5s
1764         s.testLargeManifest(c, 1, 200, 200, 4<<20) // 38s     -> 6s   -> 1.3s -> 0.7s
1765         s.testLargeManifest(c, 1, 200, 225, 4<<20) // 46s     -> 7s   -> 1.5s -> 1s
1766         s.testLargeManifest(c, 1, 400, 400, 4<<20) // 477s    -> 24s  -> 5s   -> 3s
1767         // s.testLargeManifest(c, 1, 800, 1000, 4<<20) // timeout -> 186s -> 28s  -> 17s
1768 }
1769
1770 func (s *CollectionFSUnitSuite) testLargeManifest(c *check.C, dirCount, filesPerDir, blocksPerFile, interleaveChunk int) {
1771         t0 := time.Now()
1772         const blksize = 1 << 26
1773         c.Logf("%s building manifest with dirCount=%d filesPerDir=%d blocksPerFile=%d", time.Now(), dirCount, filesPerDir, blocksPerFile)
1774         mb := bytes.NewBuffer(make([]byte, 0, 40000000))
1775         blkid := 0
1776         for i := 0; i < dirCount; i++ {
1777                 fmt.Fprintf(mb, "./dir%d", i)
1778                 for j := 0; j < filesPerDir; j++ {
1779                         for k := 0; k < blocksPerFile; k++ {
1780                                 blkid++
1781                                 fmt.Fprintf(mb, " %032x+%d+A%040x@%08x", blkid, blksize, blkid, blkid)
1782                         }
1783                 }
1784                 for j := 0; j < filesPerDir; j++ {
1785                         if interleaveChunk == 0 {
1786                                 fmt.Fprintf(mb, " %d:%d:dir%d/file%d", (filesPerDir-j-1)*blocksPerFile*blksize, blocksPerFile*blksize, j, j)
1787                                 continue
1788                         }
1789                         for todo := int64(blocksPerFile) * int64(blksize); todo > 0; todo -= int64(interleaveChunk) {
1790                                 size := int64(interleaveChunk)
1791                                 if size > todo {
1792                                         size = todo
1793                                 }
1794                                 offset := rand.Int63n(int64(blocksPerFile)*int64(blksize)*int64(filesPerDir) - size)
1795                                 fmt.Fprintf(mb, " %d:%d:dir%d/file%d", offset, size, j, j)
1796                         }
1797                 }
1798                 mb.Write([]byte{'\n'})
1799         }
1800         coll := Collection{ManifestText: mb.String()}
1801         c.Logf("%s built manifest size=%d", time.Now(), mb.Len())
1802
1803         var memstats runtime.MemStats
1804         runtime.ReadMemStats(&memstats)
1805         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1806
1807         f, err := coll.FileSystem(NewClientFromEnv(), &keepClientStub{})
1808         c.Check(err, check.IsNil)
1809         c.Logf("%s loaded", time.Now())
1810         c.Check(f.Size(), check.Equals, int64(dirCount*filesPerDir*blocksPerFile*blksize))
1811
1812         // Stat() and OpenFile() each file. This mimics the behavior
1813         // of webdav propfind, which opens each file even when just
1814         // listing directory entries.
1815         for i := 0; i < dirCount; i++ {
1816                 for j := 0; j < filesPerDir; j++ {
1817                         fnm := fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j)
1818                         fi, err := f.Stat(fnm)
1819                         c.Assert(err, check.IsNil)
1820                         c.Check(fi.IsDir(), check.Equals, false)
1821                         f, err := f.OpenFile(fnm, os.O_RDONLY, 0)
1822                         c.Assert(err, check.IsNil)
1823                         f.Close()
1824                 }
1825         }
1826         c.Logf("%s OpenFile() x %d", time.Now(), dirCount*filesPerDir)
1827
1828         runtime.ReadMemStats(&memstats)
1829         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1830         c.Logf("%s MemorySize=%d", time.Now(), f.MemorySize())
1831         c.Logf("%s ... test duration %s", time.Now(), time.Now().Sub(t0))
1832 }
1833
1834 // Gocheck boilerplate
1835 func Test(t *testing.T) {
1836         check.TestingT(t)
1837 }