17853: Fix map write when only RLock held.
[arvados.git] / sdk / go / arvados / fs_collection_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "math/rand"
15         "net/http"
16         "os"
17         "regexp"
18         "runtime"
19         "runtime/pprof"
20         "strings"
21         "sync"
22         "sync/atomic"
23         "testing"
24         "time"
25
26         check "gopkg.in/check.v1"
27 )
28
29 var _ = check.Suite(&CollectionFSSuite{})
30
31 type keepClientStub struct {
32         blocks      map[string][]byte
33         refreshable map[string]bool
34         onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
35         authToken   string               // client's auth token (used for signing locators)
36         sigkey      string               // blob signing key
37         sigttl      time.Duration        // blob signing ttl
38         sync.RWMutex
39 }
40
41 var errStub404 = errors.New("404 block not found")
42
43 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
44         kcs.RLock()
45         defer kcs.RUnlock()
46         buf := kcs.blocks[locator[:32]]
47         if buf == nil {
48                 return 0, errStub404
49         }
50         return copy(p, buf[off:]), nil
51 }
52
53 func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
54         locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(p), len(p)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
55         buf := make([]byte, len(p))
56         copy(buf, p)
57         if kcs.onPut != nil {
58                 kcs.onPut(buf)
59         }
60         kcs.Lock()
61         defer kcs.Unlock()
62         kcs.blocks[locator[:32]] = buf
63         return locator, 1, nil
64 }
65
66 var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
67
68 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
69         if strings.Contains(locator, "+A") {
70                 return locator, nil
71         }
72         kcs.Lock()
73         defer kcs.Unlock()
74         if strings.Contains(locator, "+R") {
75                 if len(locator) < 32 {
76                         return "", fmt.Errorf("bad locator: %q", locator)
77                 }
78                 if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
79                         return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
80                 }
81         }
82         locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
83         locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
84         return locator, nil
85 }
86
87 type CollectionFSSuite struct {
88         client *Client
89         coll   Collection
90         fs     CollectionFileSystem
91         kc     *keepClientStub
92 }
93
94 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
95         s.client = NewClientFromEnv()
96         err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
97         c.Assert(err, check.IsNil)
98         s.kc = &keepClientStub{
99                 blocks: map[string][]byte{
100                         "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
101                 },
102                 sigkey:    fixtureBlobSigningKey,
103                 sigttl:    fixtureBlobSigningTTL,
104                 authToken: fixtureActiveToken,
105         }
106         s.fs, err = s.coll.FileSystem(s.client, s.kc)
107         c.Assert(err, check.IsNil)
108 }
109
110 func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
111         _, ok := s.fs.(http.FileSystem)
112         c.Check(ok, check.Equals, true)
113 }
114
115 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
116         fs, err := (&Collection{
117                 ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
118         }).FileSystem(s.client, s.kc)
119         c.Assert(err, check.IsNil)
120
121         f, err := fs.Open("/foo:foo")
122         c.Assert(err, check.IsNil)
123
124         fis, err := f.Readdir(0)
125         c.Check(err, check.IsNil)
126         c.Check(len(fis), check.Equals, 1)
127         c.Check(fis[0].Name(), check.Equals, "bar:bar")
128 }
129
130 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
131         f, err := s.fs.Open("/dir1")
132         c.Assert(err, check.IsNil)
133
134         st, err := f.Stat()
135         c.Assert(err, check.IsNil)
136         c.Check(st.Size(), check.Equals, int64(2))
137         c.Check(st.IsDir(), check.Equals, true)
138
139         fis, err := f.Readdir(0)
140         c.Check(err, check.IsNil)
141         c.Check(len(fis), check.Equals, 2)
142         if len(fis) > 0 {
143                 c.Check(fis[0].Size(), check.Equals, int64(3))
144         }
145 }
146
147 func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
148         f, err := s.fs.Open("./dir1")
149         c.Assert(err, check.IsNil)
150
151         fis, err := f.Readdir(1)
152         c.Check(err, check.IsNil)
153         c.Check(len(fis), check.Equals, 1)
154         if len(fis) > 0 {
155                 c.Check(fis[0].Size(), check.Equals, int64(3))
156         }
157
158         fis, err = f.Readdir(1)
159         c.Check(err, check.IsNil)
160         c.Check(len(fis), check.Equals, 1)
161         if len(fis) > 0 {
162                 c.Check(fis[0].Size(), check.Equals, int64(3))
163         }
164
165         fis, err = f.Readdir(1)
166         c.Check(len(fis), check.Equals, 0)
167         c.Check(err, check.NotNil)
168         c.Check(err, check.Equals, io.EOF)
169
170         f, err = s.fs.Open("dir1")
171         c.Assert(err, check.IsNil)
172         fis, err = f.Readdir(1)
173         c.Check(len(fis), check.Equals, 1)
174         c.Assert(err, check.IsNil)
175         fis, err = f.Readdir(2)
176         c.Check(len(fis), check.Equals, 1)
177         c.Assert(err, check.IsNil)
178         fis, err = f.Readdir(2)
179         c.Check(len(fis), check.Equals, 0)
180         c.Assert(err, check.Equals, io.EOF)
181 }
182
183 func (s *CollectionFSSuite) TestPathMunge(c *check.C) {
184         for _, path := range []string{".", "/", "./", "///", "/../", "/./.."} {
185                 f, err := s.fs.Open(path)
186                 c.Assert(err, check.IsNil)
187
188                 st, err := f.Stat()
189                 c.Assert(err, check.IsNil)
190                 c.Check(st.Size(), check.Equals, int64(1))
191                 c.Check(st.IsDir(), check.Equals, true)
192         }
193         for _, path := range []string{"/dir1", "dir1", "./dir1", "///dir1//.//", "../dir1/../dir1/"} {
194                 c.Logf("%q", path)
195                 f, err := s.fs.Open(path)
196                 c.Assert(err, check.IsNil)
197
198                 st, err := f.Stat()
199                 c.Assert(err, check.IsNil)
200                 c.Check(st.Size(), check.Equals, int64(2))
201                 c.Check(st.IsDir(), check.Equals, true)
202         }
203 }
204
205 func (s *CollectionFSSuite) TestNotExist(c *check.C) {
206         for _, path := range []string{"/no", "no", "./no", "n/o", "/n/o"} {
207                 f, err := s.fs.Open(path)
208                 c.Assert(f, check.IsNil)
209                 c.Assert(err, check.NotNil)
210                 c.Assert(os.IsNotExist(err), check.Equals, true)
211         }
212 }
213
214 func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
215         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
216         c.Assert(err, check.IsNil)
217         st, err := f.Stat()
218         c.Assert(err, check.IsNil)
219         c.Check(st.Size(), check.Equals, int64(3))
220         n, err := f.Write([]byte("bar"))
221         c.Check(n, check.Equals, 0)
222         c.Check(err, check.Equals, ErrReadOnlyFile)
223 }
224
225 func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
226         f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
227         c.Assert(err, check.IsNil)
228         st, err := f.Stat()
229         c.Assert(err, check.IsNil)
230         c.Check(st.Size(), check.Equals, int64(0))
231
232         n, err := f.Write([]byte("bar"))
233         c.Check(n, check.Equals, 3)
234         c.Check(err, check.IsNil)
235
236         c.Check(f.Close(), check.IsNil)
237
238         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
239         c.Check(f, check.IsNil)
240         c.Assert(err, check.NotNil)
241
242         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
243         c.Assert(err, check.IsNil)
244         st, err = f.Stat()
245         c.Assert(err, check.IsNil)
246         c.Check(st.Size(), check.Equals, int64(3))
247
248         c.Check(f.Close(), check.IsNil)
249
250         m, err := s.fs.MarshalManifest(".")
251         c.Assert(err, check.IsNil)
252         c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
253 }
254
255 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
256         maxBlockSize = 8
257         defer func() { maxBlockSize = 2 << 26 }()
258
259         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
260         c.Assert(err, check.IsNil)
261         defer f.Close()
262         st, err := f.Stat()
263         c.Assert(err, check.IsNil)
264         c.Check(st.Size(), check.Equals, int64(3))
265
266         f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
267         c.Assert(err, check.IsNil)
268         defer f2.Close()
269
270         buf := make([]byte, 64)
271         n, err := f.Read(buf)
272         c.Check(n, check.Equals, 3)
273         c.Check(err, check.Equals, io.EOF)
274         c.Check(string(buf[:3]), check.DeepEquals, "foo")
275
276         pos, err := f.Seek(-2, io.SeekCurrent)
277         c.Check(pos, check.Equals, int64(1))
278         c.Check(err, check.IsNil)
279
280         // Split a storedExtent in two, and insert a memExtent
281         n, err = f.Write([]byte("*"))
282         c.Check(n, check.Equals, 1)
283         c.Check(err, check.IsNil)
284
285         pos, err = f.Seek(0, io.SeekCurrent)
286         c.Check(pos, check.Equals, int64(2))
287         c.Check(err, check.IsNil)
288
289         pos, err = f.Seek(0, io.SeekStart)
290         c.Check(pos, check.Equals, int64(0))
291         c.Check(err, check.IsNil)
292
293         rbuf, err := ioutil.ReadAll(f)
294         c.Check(len(rbuf), check.Equals, 3)
295         c.Check(err, check.IsNil)
296         c.Check(string(rbuf), check.Equals, "f*o")
297
298         // Write multiple blocks in one call
299         f.Seek(1, io.SeekStart)
300         n, err = f.Write([]byte("0123456789abcdefg"))
301         c.Check(n, check.Equals, 17)
302         c.Check(err, check.IsNil)
303         pos, err = f.Seek(0, io.SeekCurrent)
304         c.Check(pos, check.Equals, int64(18))
305         c.Check(err, check.IsNil)
306         pos, err = f.Seek(-18, io.SeekCurrent)
307         c.Check(pos, check.Equals, int64(0))
308         c.Check(err, check.IsNil)
309         n, err = io.ReadFull(f, buf)
310         c.Check(n, check.Equals, 18)
311         c.Check(err, check.Equals, io.ErrUnexpectedEOF)
312         c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
313
314         buf2, err := ioutil.ReadAll(f2)
315         c.Check(err, check.IsNil)
316         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
317
318         // truncate to current size
319         err = f.Truncate(18)
320         c.Check(err, check.IsNil)
321         f2.Seek(0, io.SeekStart)
322         buf2, err = ioutil.ReadAll(f2)
323         c.Check(err, check.IsNil)
324         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
325
326         // shrink to zero some data
327         f.Truncate(15)
328         f2.Seek(0, io.SeekStart)
329         buf2, err = ioutil.ReadAll(f2)
330         c.Check(err, check.IsNil)
331         c.Check(string(buf2), check.Equals, "f0123456789abcd")
332
333         // grow to partial block/extent
334         f.Truncate(20)
335         f2.Seek(0, io.SeekStart)
336         buf2, err = ioutil.ReadAll(f2)
337         c.Check(err, check.IsNil)
338         c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
339
340         f.Truncate(0)
341         f2.Seek(0, io.SeekStart)
342         f2.Write([]byte("12345678abcdefghijkl"))
343
344         // grow to block/extent boundary
345         f.Truncate(64)
346         f2.Seek(0, io.SeekStart)
347         buf2, err = ioutil.ReadAll(f2)
348         c.Check(err, check.IsNil)
349         c.Check(len(buf2), check.Equals, 64)
350         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
351
352         // shrink to block/extent boundary
353         err = f.Truncate(32)
354         c.Check(err, check.IsNil)
355         f2.Seek(0, io.SeekStart)
356         buf2, err = ioutil.ReadAll(f2)
357         c.Check(err, check.IsNil)
358         c.Check(len(buf2), check.Equals, 32)
359         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
360
361         // shrink to partial block/extent
362         err = f.Truncate(15)
363         c.Check(err, check.IsNil)
364         f2.Seek(0, io.SeekStart)
365         buf2, err = ioutil.ReadAll(f2)
366         c.Check(err, check.IsNil)
367         c.Check(string(buf2), check.Equals, "12345678abcdefg")
368         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
369
370         // Force flush to ensure the block "12345678" gets stored, so
371         // we know what to expect in the final manifest below.
372         _, err = s.fs.MarshalManifest(".")
373         c.Check(err, check.IsNil)
374
375         // Truncate to size=3 while f2's ptr is at 15
376         err = f.Truncate(3)
377         c.Check(err, check.IsNil)
378         buf2, err = ioutil.ReadAll(f2)
379         c.Check(err, check.IsNil)
380         c.Check(string(buf2), check.Equals, "")
381         f2.Seek(0, io.SeekStart)
382         buf2, err = ioutil.ReadAll(f2)
383         c.Check(err, check.IsNil)
384         c.Check(string(buf2), check.Equals, "123")
385         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
386
387         m, err := s.fs.MarshalManifest(".")
388         c.Check(err, check.IsNil)
389         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
390         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
391         c.Check(s.fs.Size(), check.Equals, int64(6))
392 }
393
394 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
395         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
396         c.Assert(err, check.IsNil)
397         f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
398         c.Assert(err, check.IsNil)
399         defer f.Close()
400
401         checkSize := func(size int64) {
402                 fi, err := f.Stat()
403                 c.Assert(err, check.IsNil)
404                 c.Check(fi.Size(), check.Equals, size)
405
406                 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
407                 c.Assert(err, check.IsNil)
408                 defer f.Close()
409                 fi, err = f.Stat()
410                 c.Check(err, check.IsNil)
411                 c.Check(fi.Size(), check.Equals, size)
412                 pos, err := f.Seek(0, io.SeekEnd)
413                 c.Check(err, check.IsNil)
414                 c.Check(pos, check.Equals, size)
415         }
416
417         f.Seek(2, io.SeekEnd)
418         checkSize(0)
419         f.Write([]byte{1})
420         checkSize(3)
421
422         f.Seek(2, io.SeekCurrent)
423         checkSize(3)
424         f.Write([]byte{})
425         checkSize(5)
426
427         f.Seek(8, io.SeekStart)
428         checkSize(5)
429         n, err := f.Read(make([]byte, 1))
430         c.Check(n, check.Equals, 0)
431         c.Check(err, check.Equals, io.EOF)
432         checkSize(5)
433         f.Write([]byte{1, 2, 3})
434         checkSize(11)
435 }
436
437 func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
438         foo := "foo"
439         bar := "bar"
440         hash := map[string]string{
441                 foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
442                 bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
443         }
444
445         fs, err := (&Collection{
446                 ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
447         }).FileSystem(s.client, s.kc)
448         c.Assert(err, check.IsNil)
449         manifest, err := fs.MarshalManifest(".")
450         c.Check(manifest, check.Equals, "")
451         c.Check(err, check.NotNil)
452
453         s.kc.refreshable = map[string]bool{hash[bar]: true}
454
455         for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
456                 fs, err = (&Collection{
457                         ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
458                 }).FileSystem(s.client, s.kc)
459                 c.Assert(err, check.IsNil)
460                 manifest, err := fs.MarshalManifest(".")
461                 c.Check(err, check.IsNil)
462                 // Both blocks should now have +A signatures.
463                 c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
464                 c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
465         }
466 }
467
468 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
469         maxBlockSize = 8
470         defer func() { maxBlockSize = 2 << 26 }()
471
472         var err error
473         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
474         c.Assert(err, check.IsNil)
475         for _, name := range []string{"foo", "bar", "baz"} {
476                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
477                 c.Assert(err, check.IsNil)
478                 f.Write([]byte(name))
479                 f.Close()
480         }
481
482         m, err := s.fs.MarshalManifest(".")
483         c.Check(err, check.IsNil)
484         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
485         c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
486 }
487
488 func (s *CollectionFSSuite) TestMkdir(c *check.C) {
489         err := s.fs.Mkdir("foo/bar", 0755)
490         c.Check(err, check.Equals, os.ErrNotExist)
491
492         f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
493         c.Check(err, check.Equals, os.ErrNotExist)
494
495         err = s.fs.Mkdir("foo", 0755)
496         c.Check(err, check.IsNil)
497
498         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
499         c.Check(err, check.IsNil)
500         if err == nil {
501                 defer f.Close()
502                 f.Write([]byte("foo"))
503         }
504
505         // mkdir fails if a file already exists with that name
506         err = s.fs.Mkdir("foo/bar", 0755)
507         c.Check(err, check.NotNil)
508
509         err = s.fs.Remove("foo/bar")
510         c.Check(err, check.IsNil)
511
512         // mkdir succeeds after the file is deleted
513         err = s.fs.Mkdir("foo/bar", 0755)
514         c.Check(err, check.IsNil)
515
516         // creating a file in a nonexistent subdir should still fail
517         f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
518         c.Check(err, check.Equals, os.ErrNotExist)
519
520         f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
521         c.Check(err, check.IsNil)
522         if err == nil {
523                 defer f.Close()
524                 f.Write([]byte("foo"))
525         }
526
527         // creating foo/bar as a regular file should fail
528         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
529         c.Check(err, check.NotNil)
530
531         // creating foo/bar as a directory should fail
532         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
533         c.Check(err, check.NotNil)
534         err = s.fs.Mkdir("foo/bar", 0755)
535         c.Check(err, check.NotNil)
536
537         m, err := s.fs.MarshalManifest(".")
538         c.Check(err, check.IsNil)
539         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
540         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
541 }
542
543 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
544         if testing.Short() {
545                 c.Skip("slow")
546         }
547
548         maxBlockSize = 8
549         defer func() { maxBlockSize = 1 << 26 }()
550
551         var wg sync.WaitGroup
552         for n := 0; n < 128; n++ {
553                 wg.Add(1)
554                 go func() {
555                         defer wg.Done()
556                         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
557                         c.Assert(err, check.IsNil)
558                         defer f.Close()
559                         for i := 0; i < 1024; i++ {
560                                 r := rand.Uint32()
561                                 switch {
562                                 case r%11 == 0:
563                                         _, err := s.fs.MarshalManifest(".")
564                                         c.Check(err, check.IsNil)
565                                 case r&3 == 0:
566                                         f.Truncate(int64(rand.Intn(64)))
567                                 case r&3 == 1:
568                                         f.Seek(int64(rand.Intn(64)), io.SeekStart)
569                                 case r&3 == 2:
570                                         _, err := f.Write([]byte("beep boop"))
571                                         c.Check(err, check.IsNil)
572                                 case r&3 == 3:
573                                         _, err := ioutil.ReadAll(f)
574                                         c.Check(err, check.IsNil)
575                                 }
576                         }
577                 }()
578         }
579         wg.Wait()
580
581         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
582         c.Assert(err, check.IsNil)
583         defer f.Close()
584         buf, err := ioutil.ReadAll(f)
585         c.Check(err, check.IsNil)
586         c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
587 }
588
589 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
590         maxBlockSize = 40
591         defer func() { maxBlockSize = 2 << 26 }()
592
593         var err error
594         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
595         c.Assert(err, check.IsNil)
596
597         const nfiles = 256
598         const ngoroutines = 256
599
600         var wg sync.WaitGroup
601         for n := 0; n < ngoroutines; n++ {
602                 wg.Add(1)
603                 go func(n int) {
604                         defer wg.Done()
605                         expect := make([]byte, 0, 64)
606                         wbytes := []byte("there's no simple explanation for anything important that any of us do")
607                         f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
608                         c.Assert(err, check.IsNil)
609                         defer f.Close()
610                         for i := 0; i < nfiles; i++ {
611                                 trunc := rand.Intn(65)
612                                 woff := rand.Intn(trunc + 1)
613                                 wbytes = wbytes[:rand.Intn(64-woff+1)]
614                                 for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
615                                         buf[i] = 0
616                                 }
617                                 expect = expect[:trunc]
618                                 if trunc < woff+len(wbytes) {
619                                         expect = expect[:woff+len(wbytes)]
620                                 }
621                                 copy(expect[woff:], wbytes)
622                                 f.Truncate(int64(trunc))
623                                 pos, err := f.Seek(int64(woff), io.SeekStart)
624                                 c.Check(pos, check.Equals, int64(woff))
625                                 c.Check(err, check.IsNil)
626                                 n, err := f.Write(wbytes)
627                                 c.Check(n, check.Equals, len(wbytes))
628                                 c.Check(err, check.IsNil)
629                                 pos, err = f.Seek(0, io.SeekStart)
630                                 c.Check(pos, check.Equals, int64(0))
631                                 c.Check(err, check.IsNil)
632                                 buf, err := ioutil.ReadAll(f)
633                                 c.Check(string(buf), check.Equals, string(expect))
634                                 c.Check(err, check.IsNil)
635                         }
636                 }(n)
637         }
638         wg.Wait()
639
640         for n := 0; n < ngoroutines; n++ {
641                 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
642                 c.Assert(err, check.IsNil)
643                 f.(*filehandle).inode.(*filenode).waitPrune()
644                 s.checkMemSize(c, f)
645                 defer f.Close()
646         }
647
648         root, err := s.fs.Open("/")
649         c.Assert(err, check.IsNil)
650         defer root.Close()
651         fi, err := root.Readdir(-1)
652         c.Check(err, check.IsNil)
653         c.Check(len(fi), check.Equals, nfiles)
654
655         _, err = s.fs.MarshalManifest(".")
656         c.Check(err, check.IsNil)
657         // TODO: check manifest content
658 }
659
660 func (s *CollectionFSSuite) TestRemove(c *check.C) {
661         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
662         c.Assert(err, check.IsNil)
663         err = fs.Mkdir("dir0", 0755)
664         c.Assert(err, check.IsNil)
665         err = fs.Mkdir("dir1", 0755)
666         c.Assert(err, check.IsNil)
667         err = fs.Mkdir("dir1/dir2", 0755)
668         c.Assert(err, check.IsNil)
669         err = fs.Mkdir("dir1/dir3", 0755)
670         c.Assert(err, check.IsNil)
671
672         err = fs.Remove("dir0")
673         c.Check(err, check.IsNil)
674         err = fs.Remove("dir0")
675         c.Check(err, check.Equals, os.ErrNotExist)
676
677         err = fs.Remove("dir1/dir2/.")
678         c.Check(err, check.Equals, ErrInvalidArgument)
679         err = fs.Remove("dir1/dir2/..")
680         c.Check(err, check.Equals, ErrInvalidArgument)
681         err = fs.Remove("dir1")
682         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
683         err = fs.Remove("dir1/dir2/../../../dir1")
684         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
685         err = fs.Remove("dir1/dir3/")
686         c.Check(err, check.IsNil)
687         err = fs.RemoveAll("dir1")
688         c.Check(err, check.IsNil)
689         err = fs.RemoveAll("dir1")
690         c.Check(err, check.IsNil)
691 }
692
693 func (s *CollectionFSSuite) TestRenameError(c *check.C) {
694         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
695         c.Assert(err, check.IsNil)
696         err = fs.Mkdir("first", 0755)
697         c.Assert(err, check.IsNil)
698         err = fs.Mkdir("first/second", 0755)
699         c.Assert(err, check.IsNil)
700         f, err := fs.OpenFile("first/second/file", os.O_CREATE|os.O_WRONLY, 0755)
701         c.Assert(err, check.IsNil)
702         f.Write([]byte{1, 2, 3, 4, 5})
703         f.Close()
704         err = fs.Rename("first", "first/second/third")
705         c.Check(err, check.Equals, ErrInvalidArgument)
706         err = fs.Rename("first", "first/third")
707         c.Check(err, check.Equals, ErrInvalidArgument)
708         err = fs.Rename("first/second", "second")
709         c.Check(err, check.IsNil)
710         f, err = fs.OpenFile("second/file", 0, 0)
711         c.Assert(err, check.IsNil)
712         data, err := ioutil.ReadAll(f)
713         c.Check(err, check.IsNil)
714         c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
715 }
716
717 func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
718         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
719         c.Assert(err, check.IsNil)
720         err = fs.Mkdir("foo", 0755)
721         c.Assert(err, check.IsNil)
722         err = fs.Mkdir("bar", 0755)
723         c.Assert(err, check.IsNil)
724         err = fs.Rename("bar", "baz")
725         c.Check(err, check.IsNil)
726         err = fs.Rename("foo", "baz")
727         c.Check(err, check.NotNil)
728         err = fs.Rename("foo", "baz/")
729         c.Check(err, check.IsNil)
730         err = fs.Rename("baz/foo", ".")
731         c.Check(err, check.Equals, ErrInvalidArgument)
732         err = fs.Rename("baz/foo/", ".")
733         c.Check(err, check.Equals, ErrInvalidArgument)
734 }
735
736 func (s *CollectionFSSuite) TestRename(c *check.C) {
737         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
738         c.Assert(err, check.IsNil)
739         const (
740                 outer = 16
741                 inner = 16
742         )
743         for i := 0; i < outer; i++ {
744                 err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
745                 c.Assert(err, check.IsNil)
746                 for j := 0; j < inner; j++ {
747                         err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
748                         c.Assert(err, check.IsNil)
749                         for _, fnm := range []string{
750                                 fmt.Sprintf("dir%d/file%d", i, j),
751                                 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
752                         } {
753                                 f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
754                                 c.Assert(err, check.IsNil)
755                                 _, err = f.Write([]byte("beep"))
756                                 c.Assert(err, check.IsNil)
757                                 f.Close()
758                         }
759                 }
760         }
761         var wg sync.WaitGroup
762         for i := 0; i < outer; i++ {
763                 for j := 0; j < inner; j++ {
764                         wg.Add(1)
765                         go func(i, j int) {
766                                 defer wg.Done()
767                                 oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
768                                 newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
769                                 _, err := fs.Open(newname)
770                                 c.Check(err, check.Equals, os.ErrNotExist)
771                                 err = fs.Rename(oldname, newname)
772                                 c.Check(err, check.IsNil)
773                                 f, err := fs.Open(newname)
774                                 c.Check(err, check.IsNil)
775                                 f.Close()
776                         }(i, j)
777
778                         wg.Add(1)
779                         go func(i, j int) {
780                                 defer wg.Done()
781                                 // oldname does not exist
782                                 err := fs.Rename(
783                                         fmt.Sprintf("dir%d/dir%d/missing", i, j),
784                                         fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
785                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
786
787                                 // newname parent dir does not exist
788                                 err = fs.Rename(
789                                         fmt.Sprintf("dir%d/dir%d", i, j),
790                                         fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
791                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
792
793                                 // oldname parent dir is a file
794                                 err = fs.Rename(
795                                         fmt.Sprintf("dir%d/file%d/patherror", i, j),
796                                         fmt.Sprintf("dir%d/irrelevant", i))
797                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
798
799                                 // newname parent dir is a file
800                                 err = fs.Rename(
801                                         fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
802                                         fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
803                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
804                         }(i, j)
805                 }
806         }
807         wg.Wait()
808
809         f, err := fs.OpenFile("dir1/newfile3", 0, 0)
810         c.Assert(err, check.IsNil)
811         c.Check(f.Size(), check.Equals, int64(4))
812         buf, err := ioutil.ReadAll(f)
813         c.Check(buf, check.DeepEquals, []byte("beep"))
814         c.Check(err, check.IsNil)
815         _, err = fs.Open("dir1/dir1/file1")
816         c.Check(err, check.Equals, os.ErrNotExist)
817 }
818
819 func (s *CollectionFSSuite) TestPersist(c *check.C) {
820         maxBlockSize = 1024
821         defer func() { maxBlockSize = 2 << 26 }()
822
823         var err error
824         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
825         c.Assert(err, check.IsNil)
826         err = s.fs.Mkdir("d:r", 0755)
827         c.Assert(err, check.IsNil)
828
829         expect := map[string][]byte{}
830
831         var wg sync.WaitGroup
832         for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
833                 buf := make([]byte, 500)
834                 rand.Read(buf)
835                 expect[name] = buf
836
837                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
838                 c.Assert(err, check.IsNil)
839                 // Note: we don't close the file until after the test
840                 // is done. Writes to unclosed files should persist.
841                 defer f.Close()
842
843                 wg.Add(1)
844                 go func() {
845                         defer wg.Done()
846                         for i := 0; i < len(buf); i += 5 {
847                                 _, err := f.Write(buf[i : i+5])
848                                 c.Assert(err, check.IsNil)
849                         }
850                 }()
851         }
852         wg.Wait()
853
854         m, err := s.fs.MarshalManifest(".")
855         c.Check(err, check.IsNil)
856         c.Logf("%q", m)
857
858         root, err := s.fs.Open("/")
859         c.Assert(err, check.IsNil)
860         defer root.Close()
861         fi, err := root.Readdir(-1)
862         c.Check(err, check.IsNil)
863         c.Check(len(fi), check.Equals, 4)
864
865         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
866         c.Assert(err, check.IsNil)
867
868         root, err = persisted.Open("/")
869         c.Assert(err, check.IsNil)
870         defer root.Close()
871         fi, err = root.Readdir(-1)
872         c.Check(err, check.IsNil)
873         c.Check(len(fi), check.Equals, 4)
874
875         for name, content := range expect {
876                 c.Logf("read %q", name)
877                 f, err := persisted.Open(name)
878                 c.Assert(err, check.IsNil)
879                 defer f.Close()
880                 buf, err := ioutil.ReadAll(f)
881                 c.Check(err, check.IsNil)
882                 c.Check(buf, check.DeepEquals, content)
883         }
884 }
885
886 func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
887         var err error
888         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
889         c.Assert(err, check.IsNil)
890         for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
891                 err = s.fs.Mkdir(name, 0755)
892                 c.Assert(err, check.IsNil)
893         }
894
895         expect := map[string][]byte{
896                 "0":                nil,
897                 "00":               {},
898                 "one":              {1},
899                 "dir/0":            nil,
900                 "dir/two":          {1, 2},
901                 "dir/zero":         nil,
902                 "dir/zerodir/zero": nil,
903                 "zero/zero/zero":   nil,
904         }
905         for name, data := range expect {
906                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
907                 c.Assert(err, check.IsNil)
908                 if data != nil {
909                         _, err := f.Write(data)
910                         c.Assert(err, check.IsNil)
911                 }
912                 f.Close()
913         }
914
915         m, err := s.fs.MarshalManifest(".")
916         c.Check(err, check.IsNil)
917         c.Logf("%q", m)
918
919         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
920         c.Assert(err, check.IsNil)
921
922         for name, data := range expect {
923                 _, err = persisted.Open("bogus-" + name)
924                 c.Check(err, check.NotNil)
925
926                 f, err := persisted.Open(name)
927                 c.Assert(err, check.IsNil)
928
929                 if data == nil {
930                         data = []byte{}
931                 }
932                 buf, err := ioutil.ReadAll(f)
933                 c.Check(err, check.IsNil)
934                 c.Check(buf, check.DeepEquals, data)
935         }
936
937         expectDir := map[string]int{
938                 "empty":           0,
939                 "not empty":       1,
940                 "not empty/empty": 0,
941         }
942         for name, expectLen := range expectDir {
943                 _, err := persisted.Open(name + "/bogus")
944                 c.Check(err, check.NotNil)
945
946                 d, err := persisted.Open(name)
947                 defer d.Close()
948                 c.Check(err, check.IsNil)
949                 fi, err := d.Readdir(-1)
950                 c.Check(err, check.IsNil)
951                 c.Check(fi, check.HasLen, expectLen)
952         }
953 }
954
955 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
956         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
957         c.Assert(err, check.IsNil)
958
959         f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
960         c.Check(f, check.IsNil)
961         c.Check(err, check.ErrorMatches, `file does not exist`)
962
963         f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
964         c.Assert(err, check.IsNil)
965         defer f.Close()
966         n, err := f.Write([]byte{1, 2, 3})
967         c.Check(n, check.Equals, 0)
968         c.Check(err, check.ErrorMatches, `read-only file`)
969         n, err = f.Read(make([]byte, 1))
970         c.Check(n, check.Equals, 0)
971         c.Check(err, check.Equals, io.EOF)
972         f, err = fs.OpenFile("new", os.O_RDWR, 0)
973         c.Assert(err, check.IsNil)
974         defer f.Close()
975         _, err = f.Write([]byte{4, 5, 6})
976         c.Check(err, check.IsNil)
977         fi, err := f.Stat()
978         c.Assert(err, check.IsNil)
979         c.Check(fi.Size(), check.Equals, int64(3))
980
981         f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
982         c.Assert(err, check.IsNil)
983         defer f.Close()
984         pos, err := f.Seek(0, io.SeekEnd)
985         c.Check(pos, check.Equals, int64(0))
986         c.Check(err, check.IsNil)
987         fi, err = f.Stat()
988         c.Assert(err, check.IsNil)
989         c.Check(fi.Size(), check.Equals, int64(0))
990         fs.Remove("new")
991
992         buf := make([]byte, 64)
993         f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
994         c.Assert(err, check.IsNil)
995         f.Write([]byte{1, 2, 3})
996         f.Seek(0, io.SeekStart)
997         n, _ = f.Read(buf[:1])
998         c.Check(n, check.Equals, 1)
999         c.Check(buf[:1], check.DeepEquals, []byte{1})
1000         pos, err = f.Seek(0, io.SeekCurrent)
1001         c.Assert(err, check.IsNil)
1002         c.Check(pos, check.Equals, int64(1))
1003         f.Write([]byte{4, 5, 6})
1004         pos, err = f.Seek(0, io.SeekCurrent)
1005         c.Assert(err, check.IsNil)
1006         c.Check(pos, check.Equals, int64(6))
1007         f.Seek(0, io.SeekStart)
1008         n, err = f.Read(buf)
1009         c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
1010         c.Check(err, check.Equals, io.EOF)
1011         f.Close()
1012
1013         f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
1014         c.Assert(err, check.IsNil)
1015         pos, err = f.Seek(0, io.SeekCurrent)
1016         c.Check(pos, check.Equals, int64(0))
1017         c.Check(err, check.IsNil)
1018         f.Read(buf[:3])
1019         pos, _ = f.Seek(0, io.SeekCurrent)
1020         c.Check(pos, check.Equals, int64(3))
1021         f.Write([]byte{7, 8, 9})
1022         pos, err = f.Seek(0, io.SeekCurrent)
1023         c.Check(err, check.IsNil)
1024         c.Check(pos, check.Equals, int64(9))
1025         f.Close()
1026
1027         f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
1028         c.Assert(err, check.IsNil)
1029         n, err = f.Write([]byte{3, 2, 1})
1030         c.Check(n, check.Equals, 3)
1031         c.Check(err, check.IsNil)
1032         pos, _ = f.Seek(0, io.SeekCurrent)
1033         c.Check(pos, check.Equals, int64(3))
1034         pos, _ = f.Seek(0, io.SeekStart)
1035         c.Check(pos, check.Equals, int64(0))
1036         n, err = f.Read(buf)
1037         c.Check(n, check.Equals, 0)
1038         c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
1039         f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
1040         c.Assert(err, check.IsNil)
1041         n, _ = f.Read(buf)
1042         c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
1043
1044         f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
1045         c.Check(f, check.IsNil)
1046         c.Check(err, check.NotNil)
1047
1048         f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
1049         c.Check(f, check.IsNil)
1050         c.Check(err, check.ErrorMatches, `invalid flag.*`)
1051 }
1052
1053 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
1054         defer func(cw, mbs int) {
1055                 concurrentWriters = cw
1056                 maxBlockSize = mbs
1057         }(concurrentWriters, maxBlockSize)
1058         concurrentWriters = 2
1059         maxBlockSize = 1024
1060
1061         proceed := make(chan struct{})
1062         var started, concurrent int32
1063         blk2done := false
1064         s.kc.onPut = func([]byte) {
1065                 atomic.AddInt32(&concurrent, 1)
1066                 switch atomic.AddInt32(&started, 1) {
1067                 case 1:
1068                         // Wait until block 2 starts and finishes, and block 3 starts
1069                         select {
1070                         case <-proceed:
1071                                 c.Check(blk2done, check.Equals, true)
1072                         case <-time.After(time.Second):
1073                                 c.Error("timed out")
1074                         }
1075                 case 2:
1076                         time.Sleep(time.Millisecond)
1077                         blk2done = true
1078                 case 3:
1079                         close(proceed)
1080                 default:
1081                         time.Sleep(time.Millisecond)
1082                 }
1083                 c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
1084         }
1085
1086         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1087         c.Assert(err, check.IsNil)
1088         f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
1089         c.Assert(err, check.IsNil)
1090         defer f.Close()
1091
1092         data := make([]byte, 500)
1093         rand.Read(data)
1094
1095         for i := 0; i < 100; i++ {
1096                 n, err := f.Write(data)
1097                 c.Assert(n, check.Equals, len(data))
1098                 c.Assert(err, check.IsNil)
1099         }
1100
1101         currentMemExtents := func() (memExtents []int) {
1102                 for idx, e := range f.(*filehandle).inode.(*filenode).segments {
1103                         switch e.(type) {
1104                         case *memSegment:
1105                                 memExtents = append(memExtents, idx)
1106                         }
1107                 }
1108                 return
1109         }
1110         f.(*filehandle).inode.(*filenode).waitPrune()
1111         c.Check(currentMemExtents(), check.HasLen, 1)
1112
1113         m, err := fs.MarshalManifest(".")
1114         c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
1115         c.Check(err, check.IsNil)
1116         c.Check(currentMemExtents(), check.HasLen, 0)
1117 }
1118
1119 // Ensure blocks get flushed to disk if a lot of data is written to
1120 // small files/directories without calling sync().
1121 //
1122 // Write four 512KiB files into each of 256 top-level dirs (total
1123 // 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
1124 // exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
1125 // 2MiB).
1126 func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
1127         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1128         c.Assert(err, check.IsNil)
1129
1130         s.kc.onPut = func([]byte) {
1131                 // discard flushed data -- otherwise the stub will use
1132                 // unlimited memory
1133                 time.Sleep(time.Millisecond)
1134                 s.kc.Lock()
1135                 defer s.kc.Unlock()
1136                 s.kc.blocks = map[string][]byte{}
1137         }
1138         for i := 0; i < 256; i++ {
1139                 buf := bytes.NewBuffer(make([]byte, 524288))
1140                 fmt.Fprintf(buf, "test file in dir%d", i)
1141
1142                 dir := fmt.Sprintf("dir%d", i)
1143                 fs.Mkdir(dir, 0755)
1144                 for j := 0; j < 2; j++ {
1145                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1146                         c.Assert(err, check.IsNil)
1147                         defer f.Close()
1148                         _, err = io.Copy(f, buf)
1149                         c.Assert(err, check.IsNil)
1150                 }
1151
1152                 if i%8 == 0 {
1153                         fs.Flush("", true)
1154                 }
1155
1156                 size := fs.MemorySize()
1157                 if !c.Check(size <= 1<<24, check.Equals, true) {
1158                         c.Logf("at dir%d fs.MemorySize()=%d", i, size)
1159                         return
1160                 }
1161         }
1162 }
1163
1164 // Ensure short blocks at the end of a stream don't get flushed by
1165 // Flush(false).
1166 //
1167 // Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
1168 // blocks have been flushed while 8x 3MiB is still buffered in memory.
1169 func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
1170         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1171         c.Assert(err, check.IsNil)
1172
1173         var flushed int64
1174         s.kc.onPut = func(p []byte) {
1175                 atomic.AddInt64(&flushed, int64(len(p)))
1176         }
1177
1178         nDirs := int64(8)
1179         megabyte := make([]byte, 1<<20)
1180         for i := int64(0); i < nDirs; i++ {
1181                 dir := fmt.Sprintf("dir%d", i)
1182                 fs.Mkdir(dir, 0755)
1183                 for j := 0; j < 67; j++ {
1184                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1185                         c.Assert(err, check.IsNil)
1186                         defer f.Close()
1187                         _, err = f.Write(megabyte)
1188                         c.Assert(err, check.IsNil)
1189                 }
1190         }
1191         c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20))
1192         c.Check(flushed, check.Equals, int64(0))
1193
1194         waitForFlush := func(expectUnflushed, expectFlushed int64) {
1195                 for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
1196                 }
1197                 c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
1198                 c.Check(flushed, check.Equals, expectFlushed)
1199         }
1200
1201         // Nothing flushed yet
1202         waitForFlush((nDirs*67)<<20, 0)
1203
1204         // Flushing a non-empty dir "/" is non-recursive and there are
1205         // no top-level files, so this has no effect
1206         fs.Flush("/", false)
1207         waitForFlush((nDirs*67)<<20, 0)
1208
1209         // Flush the full block in dir0
1210         fs.Flush("dir0", false)
1211         waitForFlush((nDirs*67-64)<<20, 64<<20)
1212
1213         err = fs.Flush("dir-does-not-exist", false)
1214         c.Check(err, check.NotNil)
1215
1216         // Flush full blocks in all dirs
1217         fs.Flush("", false)
1218         waitForFlush(nDirs*3<<20, nDirs*64<<20)
1219
1220         // Flush non-full blocks, too
1221         fs.Flush("", true)
1222         waitForFlush(0, nDirs*67<<20)
1223 }
1224
1225 // Even when writing lots of files/dirs from different goroutines, as
1226 // long as Flush(dir,false) is called after writing each file,
1227 // unflushed data should be limited to one full block per
1228 // concurrentWriter, plus one nearly-full block at the end of each
1229 // dir/stream.
1230 func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
1231         nDirs := int64(8)
1232         maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
1233
1234         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1235         c.Assert(err, check.IsNil)
1236
1237         release := make(chan struct{})
1238         timeout := make(chan struct{})
1239         time.AfterFunc(10*time.Second, func() { close(timeout) })
1240         var putCount, concurrency int64
1241         var unflushed int64
1242         s.kc.onPut = func(p []byte) {
1243                 defer atomic.AddInt64(&unflushed, -int64(len(p)))
1244                 cur := atomic.AddInt64(&concurrency, 1)
1245                 defer atomic.AddInt64(&concurrency, -1)
1246                 pc := atomic.AddInt64(&putCount, 1)
1247                 if pc < int64(concurrentWriters) {
1248                         // Block until we reach concurrentWriters, to
1249                         // make sure we're really accepting concurrent
1250                         // writes.
1251                         select {
1252                         case <-release:
1253                         case <-timeout:
1254                                 c.Error("timeout")
1255                         }
1256                 } else if pc == int64(concurrentWriters) {
1257                         // Unblock the first N-1 PUT reqs.
1258                         close(release)
1259                 }
1260                 c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
1261                 c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
1262         }
1263
1264         var owg sync.WaitGroup
1265         megabyte := make([]byte, 1<<20)
1266         for i := int64(0); i < nDirs; i++ {
1267                 dir := fmt.Sprintf("dir%d", i)
1268                 fs.Mkdir(dir, 0755)
1269                 owg.Add(1)
1270                 go func() {
1271                         defer owg.Done()
1272                         defer fs.Flush(dir, true)
1273                         var iwg sync.WaitGroup
1274                         defer iwg.Wait()
1275                         for j := 0; j < 67; j++ {
1276                                 iwg.Add(1)
1277                                 go func(j int) {
1278                                         defer iwg.Done()
1279                                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1280                                         c.Assert(err, check.IsNil)
1281                                         defer f.Close()
1282                                         n, err := f.Write(megabyte)
1283                                         c.Assert(err, check.IsNil)
1284                                         atomic.AddInt64(&unflushed, int64(n))
1285                                         fs.Flush(dir, false)
1286                                 }(j)
1287                         }
1288                 }()
1289         }
1290         owg.Wait()
1291         fs.Flush("", true)
1292 }
1293
1294 func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
1295         done := false
1296         defer func() { done = true }()
1297         time.AfterFunc(10*time.Second, func() {
1298                 if !done {
1299                         pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
1300                         panic("timeout")
1301                 }
1302         })
1303
1304         wrote := 0
1305         s.kc.onPut = func(p []byte) {
1306                 s.kc.Lock()
1307                 s.kc.blocks = map[string][]byte{}
1308                 wrote++
1309                 defer c.Logf("wrote block %d, %d bytes", wrote, len(p))
1310                 s.kc.Unlock()
1311                 time.Sleep(20 * time.Millisecond)
1312         }
1313
1314         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1315         c.Assert(err, check.IsNil)
1316
1317         data := make([]byte, 1<<20)
1318         for i := 0; i < 3; i++ {
1319                 dir := fmt.Sprintf("dir%d", i)
1320                 fs.Mkdir(dir, 0755)
1321                 for j := 0; j < 200; j++ {
1322                         data[0] = byte(j)
1323                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1324                         c.Assert(err, check.IsNil)
1325                         _, err = f.Write(data)
1326                         c.Assert(err, check.IsNil)
1327                         f.Close()
1328                         fs.Flush(dir, false)
1329                 }
1330                 _, err := fs.MarshalManifest(".")
1331                 c.Check(err, check.IsNil)
1332         }
1333 }
1334
1335 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
1336         s.kc.onPut = func([]byte) {
1337                 s.kc.Lock()
1338                 s.kc.blocks = map[string][]byte{}
1339                 s.kc.Unlock()
1340         }
1341         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1342         c.Assert(err, check.IsNil)
1343         for _, blocksize := range []int{8, 1000000} {
1344                 dir := fmt.Sprintf("dir%d", blocksize)
1345                 err = fs.Mkdir(dir, 0755)
1346                 c.Assert(err, check.IsNil)
1347                 data := make([]byte, blocksize)
1348                 for i := 0; i < 100; i++ {
1349                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
1350                         c.Assert(err, check.IsNil)
1351                         _, err = f.Write(data)
1352                         c.Assert(err, check.IsNil)
1353                         f.Close()
1354                         fs.Flush(dir, false)
1355                 }
1356                 fs.Flush(dir, true)
1357                 _, err := fs.MarshalManifest(".")
1358                 c.Check(err, check.IsNil)
1359         }
1360 }
1361
1362 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
1363         for _, txt := range []string{
1364                 "\n",
1365                 ".\n",
1366                 ". \n",
1367                 ". d41d8cd98f00b204e9800998ecf8427e+0\n",
1368                 ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
1369                 ". 0:0:foo\n",
1370                 ".  0:0:foo\n",
1371                 ". 0:0:foo 0:0:bar\n",
1372                 ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
1373                 ". d41d8cd98f00b204e9800998ecf8427e+0 :0:0:foo\n",
1374                 ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
1375                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
1376                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
1377                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
1378                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
1379                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
1380                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
1381                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
1382                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
1383                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
1384                 "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
1385         } {
1386                 c.Logf("<-%q", txt)
1387                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1388                 c.Check(fs, check.IsNil)
1389                 c.Logf("-> %s", err)
1390                 c.Check(err, check.NotNil)
1391         }
1392 }
1393
1394 func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
1395         for _, txt := range []string{
1396                 "",
1397                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
1398                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
1399                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
1400                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
1401                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
1402                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
1403         } {
1404                 c.Logf("<-%q", txt)
1405                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1406                 c.Check(err, check.IsNil)
1407                 c.Check(fs, check.NotNil)
1408         }
1409 }
1410
1411 func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
1412         fn := f.(*filehandle).inode.(*filenode)
1413         var memsize int64
1414         for _, seg := range fn.segments {
1415                 if e, ok := seg.(*memSegment); ok {
1416                         memsize += int64(len(e.buf))
1417                 }
1418         }
1419         c.Check(fn.memsize, check.Equals, memsize)
1420 }
1421
1422 type CollectionFSUnitSuite struct{}
1423
1424 var _ = check.Suite(&CollectionFSUnitSuite{})
1425
1426 // expect ~2 seconds to load a manifest with 256K files
1427 func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
1428         if testing.Short() {
1429                 c.Skip("slow")
1430         }
1431
1432         const (
1433                 dirCount  = 512
1434                 fileCount = 512
1435         )
1436
1437         mb := bytes.NewBuffer(make([]byte, 0, 40000000))
1438         for i := 0; i < dirCount; i++ {
1439                 fmt.Fprintf(mb, "./dir%d", i)
1440                 for j := 0; j <= fileCount; j++ {
1441                         fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
1442                 }
1443                 for j := 0; j < fileCount; j++ {
1444                         fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
1445                 }
1446                 mb.Write([]byte{'\n'})
1447         }
1448         coll := Collection{ManifestText: mb.String()}
1449         c.Logf("%s built", time.Now())
1450
1451         var memstats runtime.MemStats
1452         runtime.ReadMemStats(&memstats)
1453         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1454
1455         f, err := coll.FileSystem(nil, nil)
1456         c.Check(err, check.IsNil)
1457         c.Logf("%s loaded", time.Now())
1458         c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
1459
1460         for i := 0; i < dirCount; i++ {
1461                 for j := 0; j < fileCount; j++ {
1462                         f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
1463                 }
1464         }
1465         c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
1466
1467         runtime.ReadMemStats(&memstats)
1468         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1469 }
1470
1471 // Gocheck boilerplate
1472 func Test(t *testing.T) {
1473         check.TestingT(t)
1474 }