18051: Add manifest-parsing / fs-building benchmark.
[arvados.git] / sdk / go / arvados / fs_collection_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "math/rand"
16         "net/http"
17         "os"
18         "regexp"
19         "runtime"
20         "runtime/pprof"
21         "strings"
22         "sync"
23         "sync/atomic"
24         "testing"
25         "time"
26
27         check "gopkg.in/check.v1"
28 )
29
30 var _ = check.Suite(&CollectionFSSuite{})
31
32 type keepClientStub struct {
33         blocks      map[string][]byte
34         refreshable map[string]bool
35         onWrite     func(bufcopy []byte) // called from WriteBlock, before acquiring lock
36         authToken   string               // client's auth token (used for signing locators)
37         sigkey      string               // blob signing key
38         sigttl      time.Duration        // blob signing ttl
39         sync.RWMutex
40 }
41
42 var errStub404 = errors.New("404 block not found")
43
44 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
45         kcs.RLock()
46         defer kcs.RUnlock()
47         buf := kcs.blocks[locator[:32]]
48         if buf == nil {
49                 return 0, errStub404
50         }
51         return copy(p, buf[off:]), nil
52 }
53
54 func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
55         if opts.Data == nil {
56                 panic("oops, stub is not made for this")
57         }
58         locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
59         buf := make([]byte, len(opts.Data))
60         copy(buf, opts.Data)
61         if kcs.onWrite != nil {
62                 kcs.onWrite(buf)
63         }
64         for _, sc := range opts.StorageClasses {
65                 if sc != "default" {
66                         return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
67                 }
68         }
69         kcs.Lock()
70         defer kcs.Unlock()
71         kcs.blocks[locator[:32]] = buf
72         return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
73 }
74
75 var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
76
77 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
78         if strings.Contains(locator, "+A") {
79                 return locator, nil
80         }
81         kcs.Lock()
82         defer kcs.Unlock()
83         if strings.Contains(locator, "+R") {
84                 if len(locator) < 32 {
85                         return "", fmt.Errorf("bad locator: %q", locator)
86                 }
87                 if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
88                         return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
89                 }
90         }
91         locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
92         locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
93         return locator, nil
94 }
95
96 type CollectionFSSuite struct {
97         client *Client
98         coll   Collection
99         fs     CollectionFileSystem
100         kc     *keepClientStub
101 }
102
103 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
104         s.client = NewClientFromEnv()
105         err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
106         c.Assert(err, check.IsNil)
107         s.kc = &keepClientStub{
108                 blocks: map[string][]byte{
109                         "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
110                 },
111                 sigkey:    fixtureBlobSigningKey,
112                 sigttl:    fixtureBlobSigningTTL,
113                 authToken: fixtureActiveToken,
114         }
115         s.fs, err = s.coll.FileSystem(s.client, s.kc)
116         c.Assert(err, check.IsNil)
117 }
118
119 func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
120         _, ok := s.fs.(http.FileSystem)
121         c.Check(ok, check.Equals, true)
122 }
123
124 func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
125         fs, err := (&Collection{
126                 StorageClassesDesired: []string{"unobtainium"},
127         }).FileSystem(s.client, s.kc)
128         c.Assert(err, check.IsNil)
129
130         f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
131         c.Assert(err, check.IsNil)
132         _, err = f.Write([]byte("food"))
133         c.Assert(err, check.IsNil)
134         err = f.Close()
135         c.Assert(err, check.IsNil)
136         _, err = fs.MarshalManifest(".")
137         c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
138 }
139
140 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
141         fs, err := (&Collection{
142                 ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
143         }).FileSystem(s.client, s.kc)
144         c.Assert(err, check.IsNil)
145
146         f, err := fs.Open("/foo:foo")
147         c.Assert(err, check.IsNil)
148
149         fis, err := f.Readdir(0)
150         c.Check(err, check.IsNil)
151         c.Check(len(fis), check.Equals, 1)
152         c.Check(fis[0].Name(), check.Equals, "bar:bar")
153 }
154
155 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
156         f, err := s.fs.Open("/dir1")
157         c.Assert(err, check.IsNil)
158
159         st, err := f.Stat()
160         c.Assert(err, check.IsNil)
161         c.Check(st.Size(), check.Equals, int64(2))
162         c.Check(st.IsDir(), check.Equals, true)
163
164         fis, err := f.Readdir(0)
165         c.Check(err, check.IsNil)
166         c.Check(len(fis), check.Equals, 2)
167         if len(fis) > 0 {
168                 c.Check(fis[0].Size(), check.Equals, int64(3))
169         }
170 }
171
172 func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
173         f, err := s.fs.Open("./dir1")
174         c.Assert(err, check.IsNil)
175
176         fis, err := f.Readdir(1)
177         c.Check(err, check.IsNil)
178         c.Check(len(fis), check.Equals, 1)
179         if len(fis) > 0 {
180                 c.Check(fis[0].Size(), check.Equals, int64(3))
181         }
182
183         fis, err = f.Readdir(1)
184         c.Check(err, check.IsNil)
185         c.Check(len(fis), check.Equals, 1)
186         if len(fis) > 0 {
187                 c.Check(fis[0].Size(), check.Equals, int64(3))
188         }
189
190         fis, err = f.Readdir(1)
191         c.Check(len(fis), check.Equals, 0)
192         c.Check(err, check.NotNil)
193         c.Check(err, check.Equals, io.EOF)
194
195         f, err = s.fs.Open("dir1")
196         c.Assert(err, check.IsNil)
197         fis, err = f.Readdir(1)
198         c.Check(len(fis), check.Equals, 1)
199         c.Assert(err, check.IsNil)
200         fis, err = f.Readdir(2)
201         c.Check(len(fis), check.Equals, 1)
202         c.Assert(err, check.IsNil)
203         fis, err = f.Readdir(2)
204         c.Check(len(fis), check.Equals, 0)
205         c.Assert(err, check.Equals, io.EOF)
206 }
207
208 func (s *CollectionFSSuite) TestPathMunge(c *check.C) {
209         for _, path := range []string{".", "/", "./", "///", "/../", "/./.."} {
210                 f, err := s.fs.Open(path)
211                 c.Assert(err, check.IsNil)
212
213                 st, err := f.Stat()
214                 c.Assert(err, check.IsNil)
215                 c.Check(st.Size(), check.Equals, int64(1))
216                 c.Check(st.IsDir(), check.Equals, true)
217         }
218         for _, path := range []string{"/dir1", "dir1", "./dir1", "///dir1//.//", "../dir1/../dir1/"} {
219                 c.Logf("%q", path)
220                 f, err := s.fs.Open(path)
221                 c.Assert(err, check.IsNil)
222
223                 st, err := f.Stat()
224                 c.Assert(err, check.IsNil)
225                 c.Check(st.Size(), check.Equals, int64(2))
226                 c.Check(st.IsDir(), check.Equals, true)
227         }
228 }
229
230 func (s *CollectionFSSuite) TestNotExist(c *check.C) {
231         for _, path := range []string{"/no", "no", "./no", "n/o", "/n/o"} {
232                 f, err := s.fs.Open(path)
233                 c.Assert(f, check.IsNil)
234                 c.Assert(err, check.NotNil)
235                 c.Assert(os.IsNotExist(err), check.Equals, true)
236         }
237 }
238
239 func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
240         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
241         c.Assert(err, check.IsNil)
242         st, err := f.Stat()
243         c.Assert(err, check.IsNil)
244         c.Check(st.Size(), check.Equals, int64(3))
245         n, err := f.Write([]byte("bar"))
246         c.Check(n, check.Equals, 0)
247         c.Check(err, check.Equals, ErrReadOnlyFile)
248 }
249
250 func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
251         f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
252         c.Assert(err, check.IsNil)
253         st, err := f.Stat()
254         c.Assert(err, check.IsNil)
255         c.Check(st.Size(), check.Equals, int64(0))
256
257         n, err := f.Write([]byte("bar"))
258         c.Check(n, check.Equals, 3)
259         c.Check(err, check.IsNil)
260
261         c.Check(f.Close(), check.IsNil)
262
263         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
264         c.Check(f, check.IsNil)
265         c.Assert(err, check.NotNil)
266
267         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
268         c.Assert(err, check.IsNil)
269         st, err = f.Stat()
270         c.Assert(err, check.IsNil)
271         c.Check(st.Size(), check.Equals, int64(3))
272
273         c.Check(f.Close(), check.IsNil)
274
275         m, err := s.fs.MarshalManifest(".")
276         c.Assert(err, check.IsNil)
277         c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
278 }
279
280 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
281         maxBlockSize = 8
282         defer func() { maxBlockSize = 2 << 26 }()
283
284         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
285         c.Assert(err, check.IsNil)
286         defer f.Close()
287         st, err := f.Stat()
288         c.Assert(err, check.IsNil)
289         c.Check(st.Size(), check.Equals, int64(3))
290
291         f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
292         c.Assert(err, check.IsNil)
293         defer f2.Close()
294
295         buf := make([]byte, 64)
296         n, err := f.Read(buf)
297         c.Check(n, check.Equals, 3)
298         c.Check(err, check.Equals, io.EOF)
299         c.Check(string(buf[:3]), check.DeepEquals, "foo")
300
301         pos, err := f.Seek(-2, io.SeekCurrent)
302         c.Check(pos, check.Equals, int64(1))
303         c.Check(err, check.IsNil)
304
305         // Split a storedExtent in two, and insert a memExtent
306         n, err = f.Write([]byte("*"))
307         c.Check(n, check.Equals, 1)
308         c.Check(err, check.IsNil)
309
310         pos, err = f.Seek(0, io.SeekCurrent)
311         c.Check(pos, check.Equals, int64(2))
312         c.Check(err, check.IsNil)
313
314         pos, err = f.Seek(0, io.SeekStart)
315         c.Check(pos, check.Equals, int64(0))
316         c.Check(err, check.IsNil)
317
318         rbuf, err := ioutil.ReadAll(f)
319         c.Check(len(rbuf), check.Equals, 3)
320         c.Check(err, check.IsNil)
321         c.Check(string(rbuf), check.Equals, "f*o")
322
323         // Write multiple blocks in one call
324         f.Seek(1, io.SeekStart)
325         n, err = f.Write([]byte("0123456789abcdefg"))
326         c.Check(n, check.Equals, 17)
327         c.Check(err, check.IsNil)
328         pos, err = f.Seek(0, io.SeekCurrent)
329         c.Check(pos, check.Equals, int64(18))
330         c.Check(err, check.IsNil)
331         pos, err = f.Seek(-18, io.SeekCurrent)
332         c.Check(pos, check.Equals, int64(0))
333         c.Check(err, check.IsNil)
334         n, err = io.ReadFull(f, buf)
335         c.Check(n, check.Equals, 18)
336         c.Check(err, check.Equals, io.ErrUnexpectedEOF)
337         c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
338
339         buf2, err := ioutil.ReadAll(f2)
340         c.Check(err, check.IsNil)
341         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
342
343         // truncate to current size
344         err = f.Truncate(18)
345         c.Check(err, check.IsNil)
346         f2.Seek(0, io.SeekStart)
347         buf2, err = ioutil.ReadAll(f2)
348         c.Check(err, check.IsNil)
349         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
350
351         // shrink to zero some data
352         f.Truncate(15)
353         f2.Seek(0, io.SeekStart)
354         buf2, err = ioutil.ReadAll(f2)
355         c.Check(err, check.IsNil)
356         c.Check(string(buf2), check.Equals, "f0123456789abcd")
357
358         // grow to partial block/extent
359         f.Truncate(20)
360         f2.Seek(0, io.SeekStart)
361         buf2, err = ioutil.ReadAll(f2)
362         c.Check(err, check.IsNil)
363         c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
364
365         f.Truncate(0)
366         f2.Seek(0, io.SeekStart)
367         f2.Write([]byte("12345678abcdefghijkl"))
368
369         // grow to block/extent boundary
370         f.Truncate(64)
371         f2.Seek(0, io.SeekStart)
372         buf2, err = ioutil.ReadAll(f2)
373         c.Check(err, check.IsNil)
374         c.Check(len(buf2), check.Equals, 64)
375         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
376
377         // shrink to block/extent boundary
378         err = f.Truncate(32)
379         c.Check(err, check.IsNil)
380         f2.Seek(0, io.SeekStart)
381         buf2, err = ioutil.ReadAll(f2)
382         c.Check(err, check.IsNil)
383         c.Check(len(buf2), check.Equals, 32)
384         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
385
386         // shrink to partial block/extent
387         err = f.Truncate(15)
388         c.Check(err, check.IsNil)
389         f2.Seek(0, io.SeekStart)
390         buf2, err = ioutil.ReadAll(f2)
391         c.Check(err, check.IsNil)
392         c.Check(string(buf2), check.Equals, "12345678abcdefg")
393         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
394
395         // Force flush to ensure the block "12345678" gets stored, so
396         // we know what to expect in the final manifest below.
397         _, err = s.fs.MarshalManifest(".")
398         c.Check(err, check.IsNil)
399
400         // Truncate to size=3 while f2's ptr is at 15
401         err = f.Truncate(3)
402         c.Check(err, check.IsNil)
403         buf2, err = ioutil.ReadAll(f2)
404         c.Check(err, check.IsNil)
405         c.Check(string(buf2), check.Equals, "")
406         f2.Seek(0, io.SeekStart)
407         buf2, err = ioutil.ReadAll(f2)
408         c.Check(err, check.IsNil)
409         c.Check(string(buf2), check.Equals, "123")
410         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
411
412         m, err := s.fs.MarshalManifest(".")
413         c.Check(err, check.IsNil)
414         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
415         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
416         c.Check(s.fs.Size(), check.Equals, int64(6))
417 }
418
419 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
420         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
421         c.Assert(err, check.IsNil)
422         f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
423         c.Assert(err, check.IsNil)
424         defer f.Close()
425
426         checkSize := func(size int64) {
427                 fi, err := f.Stat()
428                 c.Assert(err, check.IsNil)
429                 c.Check(fi.Size(), check.Equals, size)
430
431                 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
432                 c.Assert(err, check.IsNil)
433                 defer f.Close()
434                 fi, err = f.Stat()
435                 c.Check(err, check.IsNil)
436                 c.Check(fi.Size(), check.Equals, size)
437                 pos, err := f.Seek(0, io.SeekEnd)
438                 c.Check(err, check.IsNil)
439                 c.Check(pos, check.Equals, size)
440         }
441
442         f.Seek(2, io.SeekEnd)
443         checkSize(0)
444         f.Write([]byte{1})
445         checkSize(3)
446
447         f.Seek(2, io.SeekCurrent)
448         checkSize(3)
449         f.Write([]byte{})
450         checkSize(5)
451
452         f.Seek(8, io.SeekStart)
453         checkSize(5)
454         n, err := f.Read(make([]byte, 1))
455         c.Check(n, check.Equals, 0)
456         c.Check(err, check.Equals, io.EOF)
457         checkSize(5)
458         f.Write([]byte{1, 2, 3})
459         checkSize(11)
460 }
461
462 func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
463         foo := "foo"
464         bar := "bar"
465         hash := map[string]string{
466                 foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
467                 bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
468         }
469
470         fs, err := (&Collection{
471                 ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
472         }).FileSystem(s.client, s.kc)
473         c.Assert(err, check.IsNil)
474         manifest, err := fs.MarshalManifest(".")
475         c.Check(manifest, check.Equals, "")
476         c.Check(err, check.NotNil)
477
478         s.kc.refreshable = map[string]bool{hash[bar]: true}
479
480         for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
481                 fs, err = (&Collection{
482                         ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
483                 }).FileSystem(s.client, s.kc)
484                 c.Assert(err, check.IsNil)
485                 manifest, err := fs.MarshalManifest(".")
486                 c.Check(err, check.IsNil)
487                 // Both blocks should now have +A signatures.
488                 c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
489                 c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
490         }
491 }
492
493 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
494         maxBlockSize = 8
495         defer func() { maxBlockSize = 2 << 26 }()
496
497         var err error
498         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
499         c.Assert(err, check.IsNil)
500         for _, name := range []string{"foo", "bar", "baz"} {
501                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
502                 c.Assert(err, check.IsNil)
503                 f.Write([]byte(name))
504                 f.Close()
505         }
506
507         m, err := s.fs.MarshalManifest(".")
508         c.Check(err, check.IsNil)
509         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
510         c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
511 }
512
513 func (s *CollectionFSSuite) TestMkdir(c *check.C) {
514         err := s.fs.Mkdir("foo/bar", 0755)
515         c.Check(err, check.Equals, os.ErrNotExist)
516
517         f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
518         c.Check(err, check.Equals, os.ErrNotExist)
519
520         err = s.fs.Mkdir("foo", 0755)
521         c.Check(err, check.IsNil)
522
523         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
524         c.Check(err, check.IsNil)
525         if err == nil {
526                 defer f.Close()
527                 f.Write([]byte("foo"))
528         }
529
530         // mkdir fails if a file already exists with that name
531         err = s.fs.Mkdir("foo/bar", 0755)
532         c.Check(err, check.NotNil)
533
534         err = s.fs.Remove("foo/bar")
535         c.Check(err, check.IsNil)
536
537         // mkdir succeeds after the file is deleted
538         err = s.fs.Mkdir("foo/bar", 0755)
539         c.Check(err, check.IsNil)
540
541         // creating a file in a nonexistent subdir should still fail
542         f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
543         c.Check(err, check.Equals, os.ErrNotExist)
544
545         f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
546         c.Check(err, check.IsNil)
547         if err == nil {
548                 defer f.Close()
549                 f.Write([]byte("foo"))
550         }
551
552         // creating foo/bar as a regular file should fail
553         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
554         c.Check(err, check.NotNil)
555
556         // creating foo/bar as a directory should fail
557         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
558         c.Check(err, check.NotNil)
559         err = s.fs.Mkdir("foo/bar", 0755)
560         c.Check(err, check.NotNil)
561
562         m, err := s.fs.MarshalManifest(".")
563         c.Check(err, check.IsNil)
564         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
565         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
566 }
567
568 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
569         if testing.Short() {
570                 c.Skip("slow")
571         }
572
573         maxBlockSize = 8
574         defer func() { maxBlockSize = 1 << 26 }()
575
576         var wg sync.WaitGroup
577         for n := 0; n < 128; n++ {
578                 wg.Add(1)
579                 go func() {
580                         defer wg.Done()
581                         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
582                         c.Assert(err, check.IsNil)
583                         defer f.Close()
584                         for i := 0; i < 1024; i++ {
585                                 r := rand.Uint32()
586                                 switch {
587                                 case r%11 == 0:
588                                         _, err := s.fs.MarshalManifest(".")
589                                         c.Check(err, check.IsNil)
590                                 case r&3 == 0:
591                                         f.Truncate(int64(rand.Intn(64)))
592                                 case r&3 == 1:
593                                         f.Seek(int64(rand.Intn(64)), io.SeekStart)
594                                 case r&3 == 2:
595                                         _, err := f.Write([]byte("beep boop"))
596                                         c.Check(err, check.IsNil)
597                                 case r&3 == 3:
598                                         _, err := ioutil.ReadAll(f)
599                                         c.Check(err, check.IsNil)
600                                 }
601                         }
602                 }()
603         }
604         wg.Wait()
605
606         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
607         c.Assert(err, check.IsNil)
608         defer f.Close()
609         buf, err := ioutil.ReadAll(f)
610         c.Check(err, check.IsNil)
611         c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
612 }
613
614 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
615         maxBlockSize = 40
616         defer func() { maxBlockSize = 2 << 26 }()
617
618         var err error
619         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
620         c.Assert(err, check.IsNil)
621
622         const nfiles = 256
623         const ngoroutines = 256
624
625         var wg sync.WaitGroup
626         for n := 0; n < ngoroutines; n++ {
627                 wg.Add(1)
628                 go func(n int) {
629                         defer wg.Done()
630                         expect := make([]byte, 0, 64)
631                         wbytes := []byte("there's no simple explanation for anything important that any of us do")
632                         f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
633                         c.Assert(err, check.IsNil)
634                         defer f.Close()
635                         for i := 0; i < nfiles; i++ {
636                                 trunc := rand.Intn(65)
637                                 woff := rand.Intn(trunc + 1)
638                                 wbytes = wbytes[:rand.Intn(64-woff+1)]
639                                 for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
640                                         buf[i] = 0
641                                 }
642                                 expect = expect[:trunc]
643                                 if trunc < woff+len(wbytes) {
644                                         expect = expect[:woff+len(wbytes)]
645                                 }
646                                 copy(expect[woff:], wbytes)
647                                 f.Truncate(int64(trunc))
648                                 pos, err := f.Seek(int64(woff), io.SeekStart)
649                                 c.Check(pos, check.Equals, int64(woff))
650                                 c.Check(err, check.IsNil)
651                                 n, err := f.Write(wbytes)
652                                 c.Check(n, check.Equals, len(wbytes))
653                                 c.Check(err, check.IsNil)
654                                 pos, err = f.Seek(0, io.SeekStart)
655                                 c.Check(pos, check.Equals, int64(0))
656                                 c.Check(err, check.IsNil)
657                                 buf, err := ioutil.ReadAll(f)
658                                 c.Check(string(buf), check.Equals, string(expect))
659                                 c.Check(err, check.IsNil)
660                         }
661                 }(n)
662         }
663         wg.Wait()
664
665         for n := 0; n < ngoroutines; n++ {
666                 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
667                 c.Assert(err, check.IsNil)
668                 f.(*filehandle).inode.(*filenode).waitPrune()
669                 s.checkMemSize(c, f)
670                 defer f.Close()
671         }
672
673         root, err := s.fs.Open("/")
674         c.Assert(err, check.IsNil)
675         defer root.Close()
676         fi, err := root.Readdir(-1)
677         c.Check(err, check.IsNil)
678         c.Check(len(fi), check.Equals, nfiles)
679
680         _, err = s.fs.MarshalManifest(".")
681         c.Check(err, check.IsNil)
682         // TODO: check manifest content
683 }
684
685 func (s *CollectionFSSuite) TestRemove(c *check.C) {
686         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
687         c.Assert(err, check.IsNil)
688         err = fs.Mkdir("dir0", 0755)
689         c.Assert(err, check.IsNil)
690         err = fs.Mkdir("dir1", 0755)
691         c.Assert(err, check.IsNil)
692         err = fs.Mkdir("dir1/dir2", 0755)
693         c.Assert(err, check.IsNil)
694         err = fs.Mkdir("dir1/dir3", 0755)
695         c.Assert(err, check.IsNil)
696
697         err = fs.Remove("dir0")
698         c.Check(err, check.IsNil)
699         err = fs.Remove("dir0")
700         c.Check(err, check.Equals, os.ErrNotExist)
701
702         err = fs.Remove("dir1/dir2/.")
703         c.Check(err, check.Equals, ErrInvalidArgument)
704         err = fs.Remove("dir1/dir2/..")
705         c.Check(err, check.Equals, ErrInvalidArgument)
706         err = fs.Remove("dir1")
707         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
708         err = fs.Remove("dir1/dir2/../../../dir1")
709         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
710         err = fs.Remove("dir1/dir3/")
711         c.Check(err, check.IsNil)
712         err = fs.RemoveAll("dir1")
713         c.Check(err, check.IsNil)
714         err = fs.RemoveAll("dir1")
715         c.Check(err, check.IsNil)
716 }
717
718 func (s *CollectionFSSuite) TestRenameError(c *check.C) {
719         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
720         c.Assert(err, check.IsNil)
721         err = fs.Mkdir("first", 0755)
722         c.Assert(err, check.IsNil)
723         err = fs.Mkdir("first/second", 0755)
724         c.Assert(err, check.IsNil)
725         f, err := fs.OpenFile("first/second/file", os.O_CREATE|os.O_WRONLY, 0755)
726         c.Assert(err, check.IsNil)
727         f.Write([]byte{1, 2, 3, 4, 5})
728         f.Close()
729         err = fs.Rename("first", "first/second/third")
730         c.Check(err, check.Equals, ErrInvalidArgument)
731         err = fs.Rename("first", "first/third")
732         c.Check(err, check.Equals, ErrInvalidArgument)
733         err = fs.Rename("first/second", "second")
734         c.Check(err, check.IsNil)
735         f, err = fs.OpenFile("second/file", 0, 0)
736         c.Assert(err, check.IsNil)
737         data, err := ioutil.ReadAll(f)
738         c.Check(err, check.IsNil)
739         c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
740 }
741
742 func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
743         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
744         c.Assert(err, check.IsNil)
745         err = fs.Mkdir("foo", 0755)
746         c.Assert(err, check.IsNil)
747         err = fs.Mkdir("bar", 0755)
748         c.Assert(err, check.IsNil)
749         err = fs.Rename("bar", "baz")
750         c.Check(err, check.IsNil)
751         err = fs.Rename("foo", "baz")
752         c.Check(err, check.NotNil)
753         err = fs.Rename("foo", "baz/")
754         c.Check(err, check.IsNil)
755         err = fs.Rename("baz/foo", ".")
756         c.Check(err, check.Equals, ErrInvalidArgument)
757         err = fs.Rename("baz/foo/", ".")
758         c.Check(err, check.Equals, ErrInvalidArgument)
759 }
760
761 func (s *CollectionFSSuite) TestRename(c *check.C) {
762         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
763         c.Assert(err, check.IsNil)
764         const (
765                 outer = 16
766                 inner = 16
767         )
768         for i := 0; i < outer; i++ {
769                 err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
770                 c.Assert(err, check.IsNil)
771                 for j := 0; j < inner; j++ {
772                         err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
773                         c.Assert(err, check.IsNil)
774                         for _, fnm := range []string{
775                                 fmt.Sprintf("dir%d/file%d", i, j),
776                                 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
777                         } {
778                                 f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
779                                 c.Assert(err, check.IsNil)
780                                 _, err = f.Write([]byte("beep"))
781                                 c.Assert(err, check.IsNil)
782                                 f.Close()
783                         }
784                 }
785         }
786         var wg sync.WaitGroup
787         for i := 0; i < outer; i++ {
788                 for j := 0; j < inner; j++ {
789                         wg.Add(1)
790                         go func(i, j int) {
791                                 defer wg.Done()
792                                 oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
793                                 newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
794                                 _, err := fs.Open(newname)
795                                 c.Check(err, check.Equals, os.ErrNotExist)
796                                 err = fs.Rename(oldname, newname)
797                                 c.Check(err, check.IsNil)
798                                 f, err := fs.Open(newname)
799                                 c.Check(err, check.IsNil)
800                                 f.Close()
801                         }(i, j)
802
803                         wg.Add(1)
804                         go func(i, j int) {
805                                 defer wg.Done()
806                                 // oldname does not exist
807                                 err := fs.Rename(
808                                         fmt.Sprintf("dir%d/dir%d/missing", i, j),
809                                         fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
810                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
811
812                                 // newname parent dir does not exist
813                                 err = fs.Rename(
814                                         fmt.Sprintf("dir%d/dir%d", i, j),
815                                         fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
816                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
817
818                                 // oldname parent dir is a file
819                                 err = fs.Rename(
820                                         fmt.Sprintf("dir%d/file%d/patherror", i, j),
821                                         fmt.Sprintf("dir%d/irrelevant", i))
822                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
823
824                                 // newname parent dir is a file
825                                 err = fs.Rename(
826                                         fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
827                                         fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
828                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
829                         }(i, j)
830                 }
831         }
832         wg.Wait()
833
834         f, err := fs.OpenFile("dir1/newfile3", 0, 0)
835         c.Assert(err, check.IsNil)
836         c.Check(f.Size(), check.Equals, int64(4))
837         buf, err := ioutil.ReadAll(f)
838         c.Check(buf, check.DeepEquals, []byte("beep"))
839         c.Check(err, check.IsNil)
840         _, err = fs.Open("dir1/dir1/file1")
841         c.Check(err, check.Equals, os.ErrNotExist)
842 }
843
844 func (s *CollectionFSSuite) TestPersist(c *check.C) {
845         maxBlockSize = 1024
846         defer func() { maxBlockSize = 2 << 26 }()
847
848         var err error
849         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
850         c.Assert(err, check.IsNil)
851         err = s.fs.Mkdir("d:r", 0755)
852         c.Assert(err, check.IsNil)
853
854         expect := map[string][]byte{}
855
856         var wg sync.WaitGroup
857         for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
858                 buf := make([]byte, 500)
859                 rand.Read(buf)
860                 expect[name] = buf
861
862                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
863                 c.Assert(err, check.IsNil)
864                 // Note: we don't close the file until after the test
865                 // is done. Writes to unclosed files should persist.
866                 defer f.Close()
867
868                 wg.Add(1)
869                 go func() {
870                         defer wg.Done()
871                         for i := 0; i < len(buf); i += 5 {
872                                 _, err := f.Write(buf[i : i+5])
873                                 c.Assert(err, check.IsNil)
874                         }
875                 }()
876         }
877         wg.Wait()
878
879         m, err := s.fs.MarshalManifest(".")
880         c.Check(err, check.IsNil)
881         c.Logf("%q", m)
882
883         root, err := s.fs.Open("/")
884         c.Assert(err, check.IsNil)
885         defer root.Close()
886         fi, err := root.Readdir(-1)
887         c.Check(err, check.IsNil)
888         c.Check(len(fi), check.Equals, 4)
889
890         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
891         c.Assert(err, check.IsNil)
892
893         root, err = persisted.Open("/")
894         c.Assert(err, check.IsNil)
895         defer root.Close()
896         fi, err = root.Readdir(-1)
897         c.Check(err, check.IsNil)
898         c.Check(len(fi), check.Equals, 4)
899
900         for name, content := range expect {
901                 c.Logf("read %q", name)
902                 f, err := persisted.Open(name)
903                 c.Assert(err, check.IsNil)
904                 defer f.Close()
905                 buf, err := ioutil.ReadAll(f)
906                 c.Check(err, check.IsNil)
907                 c.Check(buf, check.DeepEquals, content)
908         }
909 }
910
911 func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
912         var err error
913         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
914         c.Assert(err, check.IsNil)
915         for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
916                 err = s.fs.Mkdir(name, 0755)
917                 c.Assert(err, check.IsNil)
918         }
919
920         expect := map[string][]byte{
921                 "0":                nil,
922                 "00":               {},
923                 "one":              {1},
924                 "dir/0":            nil,
925                 "dir/two":          {1, 2},
926                 "dir/zero":         nil,
927                 "dir/zerodir/zero": nil,
928                 "zero/zero/zero":   nil,
929         }
930         for name, data := range expect {
931                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
932                 c.Assert(err, check.IsNil)
933                 if data != nil {
934                         _, err := f.Write(data)
935                         c.Assert(err, check.IsNil)
936                 }
937                 f.Close()
938         }
939
940         m, err := s.fs.MarshalManifest(".")
941         c.Check(err, check.IsNil)
942         c.Logf("%q", m)
943
944         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
945         c.Assert(err, check.IsNil)
946
947         for name, data := range expect {
948                 _, err = persisted.Open("bogus-" + name)
949                 c.Check(err, check.NotNil)
950
951                 f, err := persisted.Open(name)
952                 c.Assert(err, check.IsNil)
953
954                 if data == nil {
955                         data = []byte{}
956                 }
957                 buf, err := ioutil.ReadAll(f)
958                 c.Check(err, check.IsNil)
959                 c.Check(buf, check.DeepEquals, data)
960         }
961
962         expectDir := map[string]int{
963                 "empty":           0,
964                 "not empty":       1,
965                 "not empty/empty": 0,
966         }
967         for name, expectLen := range expectDir {
968                 _, err := persisted.Open(name + "/bogus")
969                 c.Check(err, check.NotNil)
970
971                 d, err := persisted.Open(name)
972                 defer d.Close()
973                 c.Check(err, check.IsNil)
974                 fi, err := d.Readdir(-1)
975                 c.Check(err, check.IsNil)
976                 c.Check(fi, check.HasLen, expectLen)
977         }
978 }
979
980 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
981         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
982         c.Assert(err, check.IsNil)
983
984         f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
985         c.Check(f, check.IsNil)
986         c.Check(err, check.ErrorMatches, `file does not exist`)
987
988         f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
989         c.Assert(err, check.IsNil)
990         defer f.Close()
991         n, err := f.Write([]byte{1, 2, 3})
992         c.Check(n, check.Equals, 0)
993         c.Check(err, check.ErrorMatches, `read-only file`)
994         n, err = f.Read(make([]byte, 1))
995         c.Check(n, check.Equals, 0)
996         c.Check(err, check.Equals, io.EOF)
997         f, err = fs.OpenFile("new", os.O_RDWR, 0)
998         c.Assert(err, check.IsNil)
999         defer f.Close()
1000         _, err = f.Write([]byte{4, 5, 6})
1001         c.Check(err, check.IsNil)
1002         fi, err := f.Stat()
1003         c.Assert(err, check.IsNil)
1004         c.Check(fi.Size(), check.Equals, int64(3))
1005
1006         f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
1007         c.Assert(err, check.IsNil)
1008         defer f.Close()
1009         pos, err := f.Seek(0, io.SeekEnd)
1010         c.Check(pos, check.Equals, int64(0))
1011         c.Check(err, check.IsNil)
1012         fi, err = f.Stat()
1013         c.Assert(err, check.IsNil)
1014         c.Check(fi.Size(), check.Equals, int64(0))
1015         fs.Remove("new")
1016
1017         buf := make([]byte, 64)
1018         f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
1019         c.Assert(err, check.IsNil)
1020         f.Write([]byte{1, 2, 3})
1021         f.Seek(0, io.SeekStart)
1022         n, _ = f.Read(buf[:1])
1023         c.Check(n, check.Equals, 1)
1024         c.Check(buf[:1], check.DeepEquals, []byte{1})
1025         pos, err = f.Seek(0, io.SeekCurrent)
1026         c.Assert(err, check.IsNil)
1027         c.Check(pos, check.Equals, int64(1))
1028         f.Write([]byte{4, 5, 6})
1029         pos, err = f.Seek(0, io.SeekCurrent)
1030         c.Assert(err, check.IsNil)
1031         c.Check(pos, check.Equals, int64(6))
1032         f.Seek(0, io.SeekStart)
1033         n, err = f.Read(buf)
1034         c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
1035         c.Check(err, check.Equals, io.EOF)
1036         f.Close()
1037
1038         f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
1039         c.Assert(err, check.IsNil)
1040         pos, err = f.Seek(0, io.SeekCurrent)
1041         c.Check(pos, check.Equals, int64(0))
1042         c.Check(err, check.IsNil)
1043         f.Read(buf[:3])
1044         pos, _ = f.Seek(0, io.SeekCurrent)
1045         c.Check(pos, check.Equals, int64(3))
1046         f.Write([]byte{7, 8, 9})
1047         pos, err = f.Seek(0, io.SeekCurrent)
1048         c.Check(err, check.IsNil)
1049         c.Check(pos, check.Equals, int64(9))
1050         f.Close()
1051
1052         f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
1053         c.Assert(err, check.IsNil)
1054         n, err = f.Write([]byte{3, 2, 1})
1055         c.Check(n, check.Equals, 3)
1056         c.Check(err, check.IsNil)
1057         pos, _ = f.Seek(0, io.SeekCurrent)
1058         c.Check(pos, check.Equals, int64(3))
1059         pos, _ = f.Seek(0, io.SeekStart)
1060         c.Check(pos, check.Equals, int64(0))
1061         n, err = f.Read(buf)
1062         c.Check(n, check.Equals, 0)
1063         c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
1064         f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
1065         c.Assert(err, check.IsNil)
1066         n, _ = f.Read(buf)
1067         c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
1068
1069         f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
1070         c.Check(f, check.IsNil)
1071         c.Check(err, check.NotNil)
1072
1073         f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
1074         c.Check(f, check.IsNil)
1075         c.Check(err, check.ErrorMatches, `invalid flag.*`)
1076 }
1077
1078 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
1079         defer func(cw, mbs int) {
1080                 concurrentWriters = cw
1081                 maxBlockSize = mbs
1082         }(concurrentWriters, maxBlockSize)
1083         concurrentWriters = 2
1084         maxBlockSize = 1024
1085
1086         proceed := make(chan struct{})
1087         var started, concurrent int32
1088         blk2done := false
1089         s.kc.onWrite = func([]byte) {
1090                 atomic.AddInt32(&concurrent, 1)
1091                 switch atomic.AddInt32(&started, 1) {
1092                 case 1:
1093                         // Wait until block 2 starts and finishes, and block 3 starts
1094                         select {
1095                         case <-proceed:
1096                                 c.Check(blk2done, check.Equals, true)
1097                         case <-time.After(time.Second):
1098                                 c.Error("timed out")
1099                         }
1100                 case 2:
1101                         time.Sleep(time.Millisecond)
1102                         blk2done = true
1103                 case 3:
1104                         close(proceed)
1105                 default:
1106                         time.Sleep(time.Millisecond)
1107                 }
1108                 c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
1109         }
1110
1111         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1112         c.Assert(err, check.IsNil)
1113         f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
1114         c.Assert(err, check.IsNil)
1115         defer f.Close()
1116
1117         data := make([]byte, 500)
1118         rand.Read(data)
1119
1120         for i := 0; i < 100; i++ {
1121                 n, err := f.Write(data)
1122                 c.Assert(n, check.Equals, len(data))
1123                 c.Assert(err, check.IsNil)
1124         }
1125
1126         currentMemExtents := func() (memExtents []int) {
1127                 for idx, e := range f.(*filehandle).inode.(*filenode).segments {
1128                         switch e.(type) {
1129                         case *memSegment:
1130                                 memExtents = append(memExtents, idx)
1131                         }
1132                 }
1133                 return
1134         }
1135         f.(*filehandle).inode.(*filenode).waitPrune()
1136         c.Check(currentMemExtents(), check.HasLen, 1)
1137
1138         m, err := fs.MarshalManifest(".")
1139         c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
1140         c.Check(err, check.IsNil)
1141         c.Check(currentMemExtents(), check.HasLen, 0)
1142 }
1143
1144 // Ensure blocks get flushed to disk if a lot of data is written to
1145 // small files/directories without calling sync().
1146 //
1147 // Write four 512KiB files into each of 256 top-level dirs (total
1148 // 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
1149 // exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
1150 // 2MiB).
1151 func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
1152         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1153         c.Assert(err, check.IsNil)
1154
1155         s.kc.onWrite = func([]byte) {
1156                 // discard flushed data -- otherwise the stub will use
1157                 // unlimited memory
1158                 time.Sleep(time.Millisecond)
1159                 s.kc.Lock()
1160                 defer s.kc.Unlock()
1161                 s.kc.blocks = map[string][]byte{}
1162         }
1163         for i := 0; i < 256; i++ {
1164                 buf := bytes.NewBuffer(make([]byte, 524288))
1165                 fmt.Fprintf(buf, "test file in dir%d", i)
1166
1167                 dir := fmt.Sprintf("dir%d", i)
1168                 fs.Mkdir(dir, 0755)
1169                 for j := 0; j < 2; j++ {
1170                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1171                         c.Assert(err, check.IsNil)
1172                         defer f.Close()
1173                         _, err = io.Copy(f, buf)
1174                         c.Assert(err, check.IsNil)
1175                 }
1176
1177                 if i%8 == 0 {
1178                         fs.Flush("", true)
1179                 }
1180
1181                 size := fs.MemorySize()
1182                 if !c.Check(size <= 1<<24, check.Equals, true) {
1183                         c.Logf("at dir%d fs.MemorySize()=%d", i, size)
1184                         return
1185                 }
1186         }
1187 }
1188
1189 // Ensure short blocks at the end of a stream don't get flushed by
1190 // Flush(false).
1191 //
1192 // Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
1193 // blocks have been flushed while 8x 3MiB is still buffered in memory.
1194 func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
1195         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1196         c.Assert(err, check.IsNil)
1197
1198         var flushed int64
1199         s.kc.onWrite = func(p []byte) {
1200                 atomic.AddInt64(&flushed, int64(len(p)))
1201         }
1202
1203         nDirs := int64(8)
1204         megabyte := make([]byte, 1<<20)
1205         for i := int64(0); i < nDirs; i++ {
1206                 dir := fmt.Sprintf("dir%d", i)
1207                 fs.Mkdir(dir, 0755)
1208                 for j := 0; j < 67; j++ {
1209                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1210                         c.Assert(err, check.IsNil)
1211                         defer f.Close()
1212                         _, err = f.Write(megabyte)
1213                         c.Assert(err, check.IsNil)
1214                 }
1215         }
1216         c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20))
1217         c.Check(flushed, check.Equals, int64(0))
1218
1219         waitForFlush := func(expectUnflushed, expectFlushed int64) {
1220                 for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
1221                 }
1222                 c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
1223                 c.Check(flushed, check.Equals, expectFlushed)
1224         }
1225
1226         // Nothing flushed yet
1227         waitForFlush((nDirs*67)<<20, 0)
1228
1229         // Flushing a non-empty dir "/" is non-recursive and there are
1230         // no top-level files, so this has no effect
1231         fs.Flush("/", false)
1232         waitForFlush((nDirs*67)<<20, 0)
1233
1234         // Flush the full block in dir0
1235         fs.Flush("dir0", false)
1236         waitForFlush((nDirs*67-64)<<20, 64<<20)
1237
1238         err = fs.Flush("dir-does-not-exist", false)
1239         c.Check(err, check.NotNil)
1240
1241         // Flush full blocks in all dirs
1242         fs.Flush("", false)
1243         waitForFlush(nDirs*3<<20, nDirs*64<<20)
1244
1245         // Flush non-full blocks, too
1246         fs.Flush("", true)
1247         waitForFlush(0, nDirs*67<<20)
1248 }
1249
1250 // Even when writing lots of files/dirs from different goroutines, as
1251 // long as Flush(dir,false) is called after writing each file,
1252 // unflushed data should be limited to one full block per
1253 // concurrentWriter, plus one nearly-full block at the end of each
1254 // dir/stream.
1255 func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
1256         nDirs := int64(8)
1257         maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
1258
1259         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1260         c.Assert(err, check.IsNil)
1261
1262         release := make(chan struct{})
1263         timeout := make(chan struct{})
1264         time.AfterFunc(10*time.Second, func() { close(timeout) })
1265         var putCount, concurrency int64
1266         var unflushed int64
1267         s.kc.onWrite = func(p []byte) {
1268                 defer atomic.AddInt64(&unflushed, -int64(len(p)))
1269                 cur := atomic.AddInt64(&concurrency, 1)
1270                 defer atomic.AddInt64(&concurrency, -1)
1271                 pc := atomic.AddInt64(&putCount, 1)
1272                 if pc < int64(concurrentWriters) {
1273                         // Block until we reach concurrentWriters, to
1274                         // make sure we're really accepting concurrent
1275                         // writes.
1276                         select {
1277                         case <-release:
1278                         case <-timeout:
1279                                 c.Error("timeout")
1280                         }
1281                 } else if pc == int64(concurrentWriters) {
1282                         // Unblock the first N-1 PUT reqs.
1283                         close(release)
1284                 }
1285                 c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
1286                 c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
1287         }
1288
1289         var owg sync.WaitGroup
1290         megabyte := make([]byte, 1<<20)
1291         for i := int64(0); i < nDirs; i++ {
1292                 dir := fmt.Sprintf("dir%d", i)
1293                 fs.Mkdir(dir, 0755)
1294                 owg.Add(1)
1295                 go func() {
1296                         defer owg.Done()
1297                         defer fs.Flush(dir, true)
1298                         var iwg sync.WaitGroup
1299                         defer iwg.Wait()
1300                         for j := 0; j < 67; j++ {
1301                                 iwg.Add(1)
1302                                 go func(j int) {
1303                                         defer iwg.Done()
1304                                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1305                                         c.Assert(err, check.IsNil)
1306                                         defer f.Close()
1307                                         n, err := f.Write(megabyte)
1308                                         c.Assert(err, check.IsNil)
1309                                         atomic.AddInt64(&unflushed, int64(n))
1310                                         fs.Flush(dir, false)
1311                                 }(j)
1312                         }
1313                 }()
1314         }
1315         owg.Wait()
1316         fs.Flush("", true)
1317 }
1318
1319 func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
1320         done := false
1321         defer func() { done = true }()
1322         time.AfterFunc(10*time.Second, func() {
1323                 if !done {
1324                         pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
1325                         panic("timeout")
1326                 }
1327         })
1328
1329         wrote := 0
1330         s.kc.onWrite = func(p []byte) {
1331                 s.kc.Lock()
1332                 s.kc.blocks = map[string][]byte{}
1333                 wrote++
1334                 defer c.Logf("wrote block %d, %d bytes", wrote, len(p))
1335                 s.kc.Unlock()
1336                 time.Sleep(20 * time.Millisecond)
1337         }
1338
1339         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1340         c.Assert(err, check.IsNil)
1341
1342         data := make([]byte, 1<<20)
1343         for i := 0; i < 3; i++ {
1344                 dir := fmt.Sprintf("dir%d", i)
1345                 fs.Mkdir(dir, 0755)
1346                 for j := 0; j < 200; j++ {
1347                         data[0] = byte(j)
1348                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1349                         c.Assert(err, check.IsNil)
1350                         _, err = f.Write(data)
1351                         c.Assert(err, check.IsNil)
1352                         f.Close()
1353                         fs.Flush(dir, false)
1354                 }
1355                 _, err := fs.MarshalManifest(".")
1356                 c.Check(err, check.IsNil)
1357         }
1358 }
1359
1360 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
1361         s.kc.onWrite = func([]byte) {
1362                 s.kc.Lock()
1363                 s.kc.blocks = map[string][]byte{}
1364                 s.kc.Unlock()
1365         }
1366         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1367         c.Assert(err, check.IsNil)
1368         for _, blocksize := range []int{8, 1000000} {
1369                 dir := fmt.Sprintf("dir%d", blocksize)
1370                 err = fs.Mkdir(dir, 0755)
1371                 c.Assert(err, check.IsNil)
1372                 data := make([]byte, blocksize)
1373                 for i := 0; i < 100; i++ {
1374                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
1375                         c.Assert(err, check.IsNil)
1376                         _, err = f.Write(data)
1377                         c.Assert(err, check.IsNil)
1378                         f.Close()
1379                         fs.Flush(dir, false)
1380                 }
1381                 fs.Flush(dir, true)
1382                 _, err := fs.MarshalManifest(".")
1383                 c.Check(err, check.IsNil)
1384         }
1385 }
1386
1387 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
1388         for _, txt := range []string{
1389                 "\n",
1390                 ".\n",
1391                 ". \n",
1392                 ". d41d8cd98f00b204e9800998ecf8427e+0\n",
1393                 ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
1394                 ". 0:0:foo\n",
1395                 ".  0:0:foo\n",
1396                 ". 0:0:foo 0:0:bar\n",
1397                 ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
1398                 ". d41d8cd98f00b204e9800998ecf8427e+0 :0:0:foo\n",
1399                 ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
1400                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
1401                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
1402                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
1403                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
1404                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
1405                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
1406                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
1407                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
1408                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
1409                 "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
1410         } {
1411                 c.Logf("<-%q", txt)
1412                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1413                 c.Check(fs, check.IsNil)
1414                 c.Logf("-> %s", err)
1415                 c.Check(err, check.NotNil)
1416         }
1417 }
1418
1419 func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
1420         for _, txt := range []string{
1421                 "",
1422                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
1423                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
1424                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
1425                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
1426                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
1427                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
1428         } {
1429                 c.Logf("<-%q", txt)
1430                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1431                 c.Check(err, check.IsNil)
1432                 c.Check(fs, check.NotNil)
1433         }
1434 }
1435
1436 var bigmanifest = func() string {
1437         var buf bytes.Buffer
1438         for i := 0; i < 2000; i++ {
1439                 fmt.Fprintf(&buf, "./dir%d", i)
1440                 for i := 0; i < 100; i++ {
1441                         fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
1442                 }
1443                 for i := 0; i < 2000; i++ {
1444                         fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
1445                 }
1446                 fmt.Fprintf(&buf, "\n")
1447         }
1448         return buf.String()
1449 }()
1450
1451 func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
1452         DebugLocksPanicMode = false
1453         c.Logf("test manifest is %d bytes", len(bigmanifest))
1454         for i := 0; i < c.N; i++ {
1455                 fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
1456                 c.Check(err, check.IsNil)
1457                 c.Check(fs, check.NotNil)
1458         }
1459 }
1460
1461 func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
1462         fn := f.(*filehandle).inode.(*filenode)
1463         var memsize int64
1464         for _, seg := range fn.segments {
1465                 if e, ok := seg.(*memSegment); ok {
1466                         memsize += int64(len(e.buf))
1467                 }
1468         }
1469         c.Check(fn.memsize, check.Equals, memsize)
1470 }
1471
1472 type CollectionFSUnitSuite struct{}
1473
1474 var _ = check.Suite(&CollectionFSUnitSuite{})
1475
1476 // expect ~2 seconds to load a manifest with 256K files
1477 func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
1478         if testing.Short() {
1479                 c.Skip("slow")
1480         }
1481
1482         const (
1483                 dirCount  = 512
1484                 fileCount = 512
1485         )
1486
1487         mb := bytes.NewBuffer(make([]byte, 0, 40000000))
1488         for i := 0; i < dirCount; i++ {
1489                 fmt.Fprintf(mb, "./dir%d", i)
1490                 for j := 0; j <= fileCount; j++ {
1491                         fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
1492                 }
1493                 for j := 0; j < fileCount; j++ {
1494                         fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
1495                 }
1496                 mb.Write([]byte{'\n'})
1497         }
1498         coll := Collection{ManifestText: mb.String()}
1499         c.Logf("%s built", time.Now())
1500
1501         var memstats runtime.MemStats
1502         runtime.ReadMemStats(&memstats)
1503         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1504
1505         f, err := coll.FileSystem(nil, nil)
1506         c.Check(err, check.IsNil)
1507         c.Logf("%s loaded", time.Now())
1508         c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
1509
1510         for i := 0; i < dirCount; i++ {
1511                 for j := 0; j < fileCount; j++ {
1512                         f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
1513                 }
1514         }
1515         c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
1516
1517         runtime.ReadMemStats(&memstats)
1518         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1519 }
1520
1521 // Gocheck boilerplate
1522 func Test(t *testing.T) {
1523         check.TestingT(t)
1524 }