15954: Move PortableDataHash func to sdk/go/arvados.
[arvados.git] / sdk / go / arvados / fs_collection_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "crypto/md5"
10         "crypto/sha1"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "math/rand"
16         "net/http"
17         "os"
18         "regexp"
19         "runtime"
20         "runtime/pprof"
21         "strings"
22         "sync"
23         "sync/atomic"
24         "testing"
25         "time"
26
27         check "gopkg.in/check.v1"
28 )
29
30 var _ = check.Suite(&CollectionFSSuite{})
31
32 type keepClientStub struct {
33         blocks      map[string][]byte
34         refreshable map[string]bool
35         onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
36         sync.RWMutex
37 }
38
39 var errStub404 = errors.New("404 block not found")
40
41 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
42         kcs.RLock()
43         defer kcs.RUnlock()
44         buf := kcs.blocks[locator[:32]]
45         if buf == nil {
46                 return 0, errStub404
47         }
48         return copy(p, buf[off:]), nil
49 }
50
51 func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
52         locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
53         buf := make([]byte, len(p))
54         copy(buf, p)
55         if kcs.onPut != nil {
56                 kcs.onPut(buf)
57         }
58         kcs.Lock()
59         defer kcs.Unlock()
60         kcs.blocks[locator[:32]] = buf
61         return locator, 1, nil
62 }
63
64 var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
65
66 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
67         kcs.Lock()
68         defer kcs.Unlock()
69         if strings.Contains(locator, "+R") {
70                 if len(locator) < 32 {
71                         return "", fmt.Errorf("bad locator: %q", locator)
72                 }
73                 if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
74                         return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
75                 }
76         }
77         fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
78         return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
79 }
80
81 type CollectionFSSuite struct {
82         client *Client
83         coll   Collection
84         fs     CollectionFileSystem
85         kc     *keepClientStub
86 }
87
88 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
89         s.client = NewClientFromEnv()
90         err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
91         c.Assert(err, check.IsNil)
92         s.kc = &keepClientStub{
93                 blocks: map[string][]byte{
94                         "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
95                 }}
96         s.fs, err = s.coll.FileSystem(s.client, s.kc)
97         c.Assert(err, check.IsNil)
98 }
99
100 func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
101         _, ok := s.fs.(http.FileSystem)
102         c.Check(ok, check.Equals, true)
103 }
104
105 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
106         fs, err := (&Collection{
107                 ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
108         }).FileSystem(s.client, s.kc)
109         c.Assert(err, check.IsNil)
110
111         f, err := fs.Open("/foo:foo")
112         c.Assert(err, check.IsNil)
113
114         fis, err := f.Readdir(0)
115         c.Check(err, check.IsNil)
116         c.Check(len(fis), check.Equals, 1)
117         c.Check(fis[0].Name(), check.Equals, "bar:bar")
118 }
119
120 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
121         f, err := s.fs.Open("/dir1")
122         c.Assert(err, check.IsNil)
123
124         st, err := f.Stat()
125         c.Assert(err, check.IsNil)
126         c.Check(st.Size(), check.Equals, int64(2))
127         c.Check(st.IsDir(), check.Equals, true)
128
129         fis, err := f.Readdir(0)
130         c.Check(err, check.IsNil)
131         c.Check(len(fis), check.Equals, 2)
132         if len(fis) > 0 {
133                 c.Check(fis[0].Size(), check.Equals, int64(3))
134         }
135 }
136
137 func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
138         f, err := s.fs.Open("./dir1")
139         c.Assert(err, check.IsNil)
140
141         fis, err := f.Readdir(1)
142         c.Check(err, check.IsNil)
143         c.Check(len(fis), check.Equals, 1)
144         if len(fis) > 0 {
145                 c.Check(fis[0].Size(), check.Equals, int64(3))
146         }
147
148         fis, err = f.Readdir(1)
149         c.Check(err, check.IsNil)
150         c.Check(len(fis), check.Equals, 1)
151         if len(fis) > 0 {
152                 c.Check(fis[0].Size(), check.Equals, int64(3))
153         }
154
155         fis, err = f.Readdir(1)
156         c.Check(len(fis), check.Equals, 0)
157         c.Check(err, check.NotNil)
158         c.Check(err, check.Equals, io.EOF)
159
160         f, err = s.fs.Open("dir1")
161         c.Assert(err, check.IsNil)
162         fis, err = f.Readdir(1)
163         c.Check(len(fis), check.Equals, 1)
164         c.Assert(err, check.IsNil)
165         fis, err = f.Readdir(2)
166         c.Check(len(fis), check.Equals, 1)
167         c.Assert(err, check.IsNil)
168         fis, err = f.Readdir(2)
169         c.Check(len(fis), check.Equals, 0)
170         c.Assert(err, check.Equals, io.EOF)
171 }
172
173 func (s *CollectionFSSuite) TestPathMunge(c *check.C) {
174         for _, path := range []string{".", "/", "./", "///", "/../", "/./.."} {
175                 f, err := s.fs.Open(path)
176                 c.Assert(err, check.IsNil)
177
178                 st, err := f.Stat()
179                 c.Assert(err, check.IsNil)
180                 c.Check(st.Size(), check.Equals, int64(1))
181                 c.Check(st.IsDir(), check.Equals, true)
182         }
183         for _, path := range []string{"/dir1", "dir1", "./dir1", "///dir1//.//", "../dir1/../dir1/"} {
184                 c.Logf("%q", path)
185                 f, err := s.fs.Open(path)
186                 c.Assert(err, check.IsNil)
187
188                 st, err := f.Stat()
189                 c.Assert(err, check.IsNil)
190                 c.Check(st.Size(), check.Equals, int64(2))
191                 c.Check(st.IsDir(), check.Equals, true)
192         }
193 }
194
195 func (s *CollectionFSSuite) TestNotExist(c *check.C) {
196         for _, path := range []string{"/no", "no", "./no", "n/o", "/n/o"} {
197                 f, err := s.fs.Open(path)
198                 c.Assert(f, check.IsNil)
199                 c.Assert(err, check.NotNil)
200                 c.Assert(os.IsNotExist(err), check.Equals, true)
201         }
202 }
203
204 func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
205         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
206         c.Assert(err, check.IsNil)
207         st, err := f.Stat()
208         c.Assert(err, check.IsNil)
209         c.Check(st.Size(), check.Equals, int64(3))
210         n, err := f.Write([]byte("bar"))
211         c.Check(n, check.Equals, 0)
212         c.Check(err, check.Equals, ErrReadOnlyFile)
213 }
214
215 func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
216         f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
217         c.Assert(err, check.IsNil)
218         st, err := f.Stat()
219         c.Assert(err, check.IsNil)
220         c.Check(st.Size(), check.Equals, int64(0))
221
222         n, err := f.Write([]byte("bar"))
223         c.Check(n, check.Equals, 3)
224         c.Check(err, check.IsNil)
225
226         c.Check(f.Close(), check.IsNil)
227
228         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
229         c.Check(f, check.IsNil)
230         c.Assert(err, check.NotNil)
231
232         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
233         c.Assert(err, check.IsNil)
234         st, err = f.Stat()
235         c.Assert(err, check.IsNil)
236         c.Check(st.Size(), check.Equals, int64(3))
237
238         c.Check(f.Close(), check.IsNil)
239
240         m, err := s.fs.MarshalManifest(".")
241         c.Assert(err, check.IsNil)
242         c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
243 }
244
245 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
246         maxBlockSize = 8
247         defer func() { maxBlockSize = 2 << 26 }()
248
249         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
250         c.Assert(err, check.IsNil)
251         defer f.Close()
252         st, err := f.Stat()
253         c.Assert(err, check.IsNil)
254         c.Check(st.Size(), check.Equals, int64(3))
255
256         f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
257         c.Assert(err, check.IsNil)
258         defer f2.Close()
259
260         buf := make([]byte, 64)
261         n, err := f.Read(buf)
262         c.Check(n, check.Equals, 3)
263         c.Check(err, check.Equals, io.EOF)
264         c.Check(string(buf[:3]), check.DeepEquals, "foo")
265
266         pos, err := f.Seek(-2, io.SeekCurrent)
267         c.Check(pos, check.Equals, int64(1))
268         c.Check(err, check.IsNil)
269
270         // Split a storedExtent in two, and insert a memExtent
271         n, err = f.Write([]byte("*"))
272         c.Check(n, check.Equals, 1)
273         c.Check(err, check.IsNil)
274
275         pos, err = f.Seek(0, io.SeekCurrent)
276         c.Check(pos, check.Equals, int64(2))
277         c.Check(err, check.IsNil)
278
279         pos, err = f.Seek(0, io.SeekStart)
280         c.Check(pos, check.Equals, int64(0))
281         c.Check(err, check.IsNil)
282
283         rbuf, err := ioutil.ReadAll(f)
284         c.Check(len(rbuf), check.Equals, 3)
285         c.Check(err, check.IsNil)
286         c.Check(string(rbuf), check.Equals, "f*o")
287
288         // Write multiple blocks in one call
289         f.Seek(1, io.SeekStart)
290         n, err = f.Write([]byte("0123456789abcdefg"))
291         c.Check(n, check.Equals, 17)
292         c.Check(err, check.IsNil)
293         pos, err = f.Seek(0, io.SeekCurrent)
294         c.Check(pos, check.Equals, int64(18))
295         c.Check(err, check.IsNil)
296         pos, err = f.Seek(-18, io.SeekCurrent)
297         c.Check(pos, check.Equals, int64(0))
298         c.Check(err, check.IsNil)
299         n, err = io.ReadFull(f, buf)
300         c.Check(n, check.Equals, 18)
301         c.Check(err, check.Equals, io.ErrUnexpectedEOF)
302         c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
303
304         buf2, err := ioutil.ReadAll(f2)
305         c.Check(err, check.IsNil)
306         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
307
308         // truncate to current size
309         err = f.Truncate(18)
310         c.Check(err, check.IsNil)
311         f2.Seek(0, io.SeekStart)
312         buf2, err = ioutil.ReadAll(f2)
313         c.Check(err, check.IsNil)
314         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
315
316         // shrink to zero some data
317         f.Truncate(15)
318         f2.Seek(0, io.SeekStart)
319         buf2, err = ioutil.ReadAll(f2)
320         c.Check(err, check.IsNil)
321         c.Check(string(buf2), check.Equals, "f0123456789abcd")
322
323         // grow to partial block/extent
324         f.Truncate(20)
325         f2.Seek(0, io.SeekStart)
326         buf2, err = ioutil.ReadAll(f2)
327         c.Check(err, check.IsNil)
328         c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
329
330         f.Truncate(0)
331         f2.Seek(0, io.SeekStart)
332         f2.Write([]byte("12345678abcdefghijkl"))
333
334         // grow to block/extent boundary
335         f.Truncate(64)
336         f2.Seek(0, io.SeekStart)
337         buf2, err = ioutil.ReadAll(f2)
338         c.Check(err, check.IsNil)
339         c.Check(len(buf2), check.Equals, 64)
340         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
341
342         // shrink to block/extent boundary
343         err = f.Truncate(32)
344         c.Check(err, check.IsNil)
345         f2.Seek(0, io.SeekStart)
346         buf2, err = ioutil.ReadAll(f2)
347         c.Check(err, check.IsNil)
348         c.Check(len(buf2), check.Equals, 32)
349         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
350
351         // shrink to partial block/extent
352         err = f.Truncate(15)
353         c.Check(err, check.IsNil)
354         f2.Seek(0, io.SeekStart)
355         buf2, err = ioutil.ReadAll(f2)
356         c.Check(err, check.IsNil)
357         c.Check(string(buf2), check.Equals, "12345678abcdefg")
358         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
359
360         // Force flush to ensure the block "12345678" gets stored, so
361         // we know what to expect in the final manifest below.
362         _, err = s.fs.MarshalManifest(".")
363         c.Check(err, check.IsNil)
364
365         // Truncate to size=3 while f2's ptr is at 15
366         err = f.Truncate(3)
367         c.Check(err, check.IsNil)
368         buf2, err = ioutil.ReadAll(f2)
369         c.Check(err, check.IsNil)
370         c.Check(string(buf2), check.Equals, "")
371         f2.Seek(0, io.SeekStart)
372         buf2, err = ioutil.ReadAll(f2)
373         c.Check(err, check.IsNil)
374         c.Check(string(buf2), check.Equals, "123")
375         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
376
377         m, err := s.fs.MarshalManifest(".")
378         c.Check(err, check.IsNil)
379         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
380         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
381         c.Check(s.fs.Size(), check.Equals, int64(6))
382 }
383
384 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
385         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
386         c.Assert(err, check.IsNil)
387         f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
388         c.Assert(err, check.IsNil)
389         defer f.Close()
390
391         checkSize := func(size int64) {
392                 fi, err := f.Stat()
393                 c.Assert(err, check.IsNil)
394                 c.Check(fi.Size(), check.Equals, size)
395
396                 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
397                 c.Assert(err, check.IsNil)
398                 defer f.Close()
399                 fi, err = f.Stat()
400                 c.Check(err, check.IsNil)
401                 c.Check(fi.Size(), check.Equals, size)
402                 pos, err := f.Seek(0, io.SeekEnd)
403                 c.Check(err, check.IsNil)
404                 c.Check(pos, check.Equals, size)
405         }
406
407         f.Seek(2, io.SeekEnd)
408         checkSize(0)
409         f.Write([]byte{1})
410         checkSize(3)
411
412         f.Seek(2, io.SeekCurrent)
413         checkSize(3)
414         f.Write([]byte{})
415         checkSize(5)
416
417         f.Seek(8, io.SeekStart)
418         checkSize(5)
419         n, err := f.Read(make([]byte, 1))
420         c.Check(n, check.Equals, 0)
421         c.Check(err, check.Equals, io.EOF)
422         checkSize(5)
423         f.Write([]byte{1, 2, 3})
424         checkSize(11)
425 }
426
427 func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
428         foo := "foo"
429         bar := "bar"
430         hash := map[string]string{
431                 foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
432                 bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
433         }
434
435         fs, err := (&Collection{
436                 ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
437         }).FileSystem(s.client, s.kc)
438         c.Assert(err, check.IsNil)
439         manifest, err := fs.MarshalManifest(".")
440         c.Check(manifest, check.Equals, "")
441         c.Check(err, check.NotNil)
442
443         s.kc.refreshable = map[string]bool{hash[bar]: true}
444
445         for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
446                 fs, err = (&Collection{
447                         ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
448                 }).FileSystem(s.client, s.kc)
449                 c.Assert(err, check.IsNil)
450                 manifest, err := fs.MarshalManifest(".")
451                 c.Check(err, check.IsNil)
452                 // Both blocks should now have +A signatures.
453                 c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
454                 c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
455         }
456 }
457
458 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
459         maxBlockSize = 8
460         defer func() { maxBlockSize = 2 << 26 }()
461
462         var err error
463         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
464         c.Assert(err, check.IsNil)
465         for _, name := range []string{"foo", "bar", "baz"} {
466                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
467                 c.Assert(err, check.IsNil)
468                 f.Write([]byte(name))
469                 f.Close()
470         }
471
472         m, err := s.fs.MarshalManifest(".")
473         c.Check(err, check.IsNil)
474         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
475         c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
476 }
477
478 func (s *CollectionFSSuite) TestMkdir(c *check.C) {
479         err := s.fs.Mkdir("foo/bar", 0755)
480         c.Check(err, check.Equals, os.ErrNotExist)
481
482         f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
483         c.Check(err, check.Equals, os.ErrNotExist)
484
485         err = s.fs.Mkdir("foo", 0755)
486         c.Check(err, check.IsNil)
487
488         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
489         c.Check(err, check.IsNil)
490         if err == nil {
491                 defer f.Close()
492                 f.Write([]byte("foo"))
493         }
494
495         // mkdir fails if a file already exists with that name
496         err = s.fs.Mkdir("foo/bar", 0755)
497         c.Check(err, check.NotNil)
498
499         err = s.fs.Remove("foo/bar")
500         c.Check(err, check.IsNil)
501
502         // mkdir succeeds after the file is deleted
503         err = s.fs.Mkdir("foo/bar", 0755)
504         c.Check(err, check.IsNil)
505
506         // creating a file in a nonexistent subdir should still fail
507         f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
508         c.Check(err, check.Equals, os.ErrNotExist)
509
510         f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
511         c.Check(err, check.IsNil)
512         if err == nil {
513                 defer f.Close()
514                 f.Write([]byte("foo"))
515         }
516
517         // creating foo/bar as a regular file should fail
518         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
519         c.Check(err, check.NotNil)
520
521         // creating foo/bar as a directory should fail
522         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
523         c.Check(err, check.NotNil)
524         err = s.fs.Mkdir("foo/bar", 0755)
525         c.Check(err, check.NotNil)
526
527         m, err := s.fs.MarshalManifest(".")
528         c.Check(err, check.IsNil)
529         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
530         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
531 }
532
533 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
534         if testing.Short() {
535                 c.Skip("slow")
536         }
537
538         maxBlockSize = 8
539         defer func() { maxBlockSize = 1 << 26 }()
540
541         var wg sync.WaitGroup
542         for n := 0; n < 128; n++ {
543                 wg.Add(1)
544                 go func() {
545                         defer wg.Done()
546                         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
547                         c.Assert(err, check.IsNil)
548                         defer f.Close()
549                         for i := 0; i < 1024; i++ {
550                                 r := rand.Uint32()
551                                 switch {
552                                 case r%11 == 0:
553                                         _, err := s.fs.MarshalManifest(".")
554                                         c.Check(err, check.IsNil)
555                                 case r&3 == 0:
556                                         f.Truncate(int64(rand.Intn(64)))
557                                 case r&3 == 1:
558                                         f.Seek(int64(rand.Intn(64)), io.SeekStart)
559                                 case r&3 == 2:
560                                         _, err := f.Write([]byte("beep boop"))
561                                         c.Check(err, check.IsNil)
562                                 case r&3 == 3:
563                                         _, err := ioutil.ReadAll(f)
564                                         c.Check(err, check.IsNil)
565                                 }
566                         }
567                 }()
568         }
569         wg.Wait()
570
571         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
572         c.Assert(err, check.IsNil)
573         defer f.Close()
574         buf, err := ioutil.ReadAll(f)
575         c.Check(err, check.IsNil)
576         c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
577 }
578
579 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
580         maxBlockSize = 40
581         defer func() { maxBlockSize = 2 << 26 }()
582
583         var err error
584         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
585         c.Assert(err, check.IsNil)
586
587         const nfiles = 256
588         const ngoroutines = 256
589
590         var wg sync.WaitGroup
591         for n := 0; n < ngoroutines; n++ {
592                 wg.Add(1)
593                 go func(n int) {
594                         defer wg.Done()
595                         expect := make([]byte, 0, 64)
596                         wbytes := []byte("there's no simple explanation for anything important that any of us do")
597                         f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
598                         c.Assert(err, check.IsNil)
599                         defer f.Close()
600                         for i := 0; i < nfiles; i++ {
601                                 trunc := rand.Intn(65)
602                                 woff := rand.Intn(trunc + 1)
603                                 wbytes = wbytes[:rand.Intn(64-woff+1)]
604                                 for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
605                                         buf[i] = 0
606                                 }
607                                 expect = expect[:trunc]
608                                 if trunc < woff+len(wbytes) {
609                                         expect = expect[:woff+len(wbytes)]
610                                 }
611                                 copy(expect[woff:], wbytes)
612                                 f.Truncate(int64(trunc))
613                                 pos, err := f.Seek(int64(woff), io.SeekStart)
614                                 c.Check(pos, check.Equals, int64(woff))
615                                 c.Check(err, check.IsNil)
616                                 n, err := f.Write(wbytes)
617                                 c.Check(n, check.Equals, len(wbytes))
618                                 c.Check(err, check.IsNil)
619                                 pos, err = f.Seek(0, io.SeekStart)
620                                 c.Check(pos, check.Equals, int64(0))
621                                 c.Check(err, check.IsNil)
622                                 buf, err := ioutil.ReadAll(f)
623                                 c.Check(string(buf), check.Equals, string(expect))
624                                 c.Check(err, check.IsNil)
625                         }
626                 }(n)
627         }
628         wg.Wait()
629
630         for n := 0; n < ngoroutines; n++ {
631                 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
632                 c.Assert(err, check.IsNil)
633                 f.(*filehandle).inode.(*filenode).waitPrune()
634                 s.checkMemSize(c, f)
635                 defer f.Close()
636         }
637
638         root, err := s.fs.Open("/")
639         c.Assert(err, check.IsNil)
640         defer root.Close()
641         fi, err := root.Readdir(-1)
642         c.Check(err, check.IsNil)
643         c.Check(len(fi), check.Equals, nfiles)
644
645         _, err = s.fs.MarshalManifest(".")
646         c.Check(err, check.IsNil)
647         // TODO: check manifest content
648 }
649
650 func (s *CollectionFSSuite) TestRemove(c *check.C) {
651         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
652         c.Assert(err, check.IsNil)
653         err = fs.Mkdir("dir0", 0755)
654         c.Assert(err, check.IsNil)
655         err = fs.Mkdir("dir1", 0755)
656         c.Assert(err, check.IsNil)
657         err = fs.Mkdir("dir1/dir2", 0755)
658         c.Assert(err, check.IsNil)
659         err = fs.Mkdir("dir1/dir3", 0755)
660         c.Assert(err, check.IsNil)
661
662         err = fs.Remove("dir0")
663         c.Check(err, check.IsNil)
664         err = fs.Remove("dir0")
665         c.Check(err, check.Equals, os.ErrNotExist)
666
667         err = fs.Remove("dir1/dir2/.")
668         c.Check(err, check.Equals, ErrInvalidArgument)
669         err = fs.Remove("dir1/dir2/..")
670         c.Check(err, check.Equals, ErrInvalidArgument)
671         err = fs.Remove("dir1")
672         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
673         err = fs.Remove("dir1/dir2/../../../dir1")
674         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
675         err = fs.Remove("dir1/dir3/")
676         c.Check(err, check.IsNil)
677         err = fs.RemoveAll("dir1")
678         c.Check(err, check.IsNil)
679         err = fs.RemoveAll("dir1")
680         c.Check(err, check.IsNil)
681 }
682
683 func (s *CollectionFSSuite) TestRenameError(c *check.C) {
684         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
685         c.Assert(err, check.IsNil)
686         err = fs.Mkdir("first", 0755)
687         c.Assert(err, check.IsNil)
688         err = fs.Mkdir("first/second", 0755)
689         c.Assert(err, check.IsNil)
690         f, err := fs.OpenFile("first/second/file", os.O_CREATE|os.O_WRONLY, 0755)
691         c.Assert(err, check.IsNil)
692         f.Write([]byte{1, 2, 3, 4, 5})
693         f.Close()
694         err = fs.Rename("first", "first/second/third")
695         c.Check(err, check.Equals, ErrInvalidArgument)
696         err = fs.Rename("first", "first/third")
697         c.Check(err, check.Equals, ErrInvalidArgument)
698         err = fs.Rename("first/second", "second")
699         c.Check(err, check.IsNil)
700         f, err = fs.OpenFile("second/file", 0, 0)
701         c.Assert(err, check.IsNil)
702         data, err := ioutil.ReadAll(f)
703         c.Check(err, check.IsNil)
704         c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
705 }
706
707 func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
708         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
709         c.Assert(err, check.IsNil)
710         err = fs.Mkdir("foo", 0755)
711         c.Assert(err, check.IsNil)
712         err = fs.Mkdir("bar", 0755)
713         c.Assert(err, check.IsNil)
714         err = fs.Rename("bar", "baz")
715         c.Check(err, check.IsNil)
716         err = fs.Rename("foo", "baz")
717         c.Check(err, check.NotNil)
718         err = fs.Rename("foo", "baz/")
719         c.Check(err, check.IsNil)
720         err = fs.Rename("baz/foo", ".")
721         c.Check(err, check.Equals, ErrInvalidArgument)
722         err = fs.Rename("baz/foo/", ".")
723         c.Check(err, check.Equals, ErrInvalidArgument)
724 }
725
726 func (s *CollectionFSSuite) TestRename(c *check.C) {
727         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
728         c.Assert(err, check.IsNil)
729         const (
730                 outer = 16
731                 inner = 16
732         )
733         for i := 0; i < outer; i++ {
734                 err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
735                 c.Assert(err, check.IsNil)
736                 for j := 0; j < inner; j++ {
737                         err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
738                         c.Assert(err, check.IsNil)
739                         for _, fnm := range []string{
740                                 fmt.Sprintf("dir%d/file%d", i, j),
741                                 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
742                         } {
743                                 f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
744                                 c.Assert(err, check.IsNil)
745                                 _, err = f.Write([]byte("beep"))
746                                 c.Assert(err, check.IsNil)
747                                 f.Close()
748                         }
749                 }
750         }
751         var wg sync.WaitGroup
752         for i := 0; i < outer; i++ {
753                 for j := 0; j < inner; j++ {
754                         wg.Add(1)
755                         go func(i, j int) {
756                                 defer wg.Done()
757                                 oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
758                                 newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
759                                 _, err := fs.Open(newname)
760                                 c.Check(err, check.Equals, os.ErrNotExist)
761                                 err = fs.Rename(oldname, newname)
762                                 c.Check(err, check.IsNil)
763                                 f, err := fs.Open(newname)
764                                 c.Check(err, check.IsNil)
765                                 f.Close()
766                         }(i, j)
767
768                         wg.Add(1)
769                         go func(i, j int) {
770                                 defer wg.Done()
771                                 // oldname does not exist
772                                 err := fs.Rename(
773                                         fmt.Sprintf("dir%d/dir%d/missing", i, j),
774                                         fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
775                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
776
777                                 // newname parent dir does not exist
778                                 err = fs.Rename(
779                                         fmt.Sprintf("dir%d/dir%d", i, j),
780                                         fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
781                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
782
783                                 // oldname parent dir is a file
784                                 err = fs.Rename(
785                                         fmt.Sprintf("dir%d/file%d/patherror", i, j),
786                                         fmt.Sprintf("dir%d/irrelevant", i))
787                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
788
789                                 // newname parent dir is a file
790                                 err = fs.Rename(
791                                         fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
792                                         fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
793                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
794                         }(i, j)
795                 }
796         }
797         wg.Wait()
798
799         f, err := fs.OpenFile("dir1/newfile3", 0, 0)
800         c.Assert(err, check.IsNil)
801         c.Check(f.Size(), check.Equals, int64(4))
802         buf, err := ioutil.ReadAll(f)
803         c.Check(buf, check.DeepEquals, []byte("beep"))
804         c.Check(err, check.IsNil)
805         _, err = fs.Open("dir1/dir1/file1")
806         c.Check(err, check.Equals, os.ErrNotExist)
807 }
808
809 func (s *CollectionFSSuite) TestPersist(c *check.C) {
810         maxBlockSize = 1024
811         defer func() { maxBlockSize = 2 << 26 }()
812
813         var err error
814         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
815         c.Assert(err, check.IsNil)
816         err = s.fs.Mkdir("d:r", 0755)
817         c.Assert(err, check.IsNil)
818
819         expect := map[string][]byte{}
820
821         var wg sync.WaitGroup
822         for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
823                 buf := make([]byte, 500)
824                 rand.Read(buf)
825                 expect[name] = buf
826
827                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
828                 c.Assert(err, check.IsNil)
829                 // Note: we don't close the file until after the test
830                 // is done. Writes to unclosed files should persist.
831                 defer f.Close()
832
833                 wg.Add(1)
834                 go func() {
835                         defer wg.Done()
836                         for i := 0; i < len(buf); i += 5 {
837                                 _, err := f.Write(buf[i : i+5])
838                                 c.Assert(err, check.IsNil)
839                         }
840                 }()
841         }
842         wg.Wait()
843
844         m, err := s.fs.MarshalManifest(".")
845         c.Check(err, check.IsNil)
846         c.Logf("%q", m)
847
848         root, err := s.fs.Open("/")
849         c.Assert(err, check.IsNil)
850         defer root.Close()
851         fi, err := root.Readdir(-1)
852         c.Check(err, check.IsNil)
853         c.Check(len(fi), check.Equals, 4)
854
855         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
856         c.Assert(err, check.IsNil)
857
858         root, err = persisted.Open("/")
859         c.Assert(err, check.IsNil)
860         defer root.Close()
861         fi, err = root.Readdir(-1)
862         c.Check(err, check.IsNil)
863         c.Check(len(fi), check.Equals, 4)
864
865         for name, content := range expect {
866                 c.Logf("read %q", name)
867                 f, err := persisted.Open(name)
868                 c.Assert(err, check.IsNil)
869                 defer f.Close()
870                 buf, err := ioutil.ReadAll(f)
871                 c.Check(err, check.IsNil)
872                 c.Check(buf, check.DeepEquals, content)
873         }
874 }
875
876 func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
877         var err error
878         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
879         c.Assert(err, check.IsNil)
880         for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
881                 err = s.fs.Mkdir(name, 0755)
882                 c.Assert(err, check.IsNil)
883         }
884
885         expect := map[string][]byte{
886                 "0":                nil,
887                 "00":               {},
888                 "one":              {1},
889                 "dir/0":            nil,
890                 "dir/two":          {1, 2},
891                 "dir/zero":         nil,
892                 "dir/zerodir/zero": nil,
893                 "zero/zero/zero":   nil,
894         }
895         for name, data := range expect {
896                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
897                 c.Assert(err, check.IsNil)
898                 if data != nil {
899                         _, err := f.Write(data)
900                         c.Assert(err, check.IsNil)
901                 }
902                 f.Close()
903         }
904
905         m, err := s.fs.MarshalManifest(".")
906         c.Check(err, check.IsNil)
907         c.Logf("%q", m)
908
909         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
910         c.Assert(err, check.IsNil)
911
912         for name, data := range expect {
913                 _, err = persisted.Open("bogus-" + name)
914                 c.Check(err, check.NotNil)
915
916                 f, err := persisted.Open(name)
917                 c.Assert(err, check.IsNil)
918
919                 if data == nil {
920                         data = []byte{}
921                 }
922                 buf, err := ioutil.ReadAll(f)
923                 c.Check(err, check.IsNil)
924                 c.Check(buf, check.DeepEquals, data)
925         }
926
927         expectDir := map[string]int{
928                 "empty":           0,
929                 "not empty":       1,
930                 "not empty/empty": 0,
931         }
932         for name, expectLen := range expectDir {
933                 _, err := persisted.Open(name + "/bogus")
934                 c.Check(err, check.NotNil)
935
936                 d, err := persisted.Open(name)
937                 defer d.Close()
938                 c.Check(err, check.IsNil)
939                 fi, err := d.Readdir(-1)
940                 c.Check(err, check.IsNil)
941                 c.Check(fi, check.HasLen, expectLen)
942         }
943 }
944
945 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
946         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
947         c.Assert(err, check.IsNil)
948
949         f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
950         c.Check(f, check.IsNil)
951         c.Check(err, check.ErrorMatches, `file does not exist`)
952
953         f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
954         c.Assert(err, check.IsNil)
955         defer f.Close()
956         n, err := f.Write([]byte{1, 2, 3})
957         c.Check(n, check.Equals, 0)
958         c.Check(err, check.ErrorMatches, `read-only file`)
959         n, err = f.Read(make([]byte, 1))
960         c.Check(n, check.Equals, 0)
961         c.Check(err, check.Equals, io.EOF)
962         f, err = fs.OpenFile("new", os.O_RDWR, 0)
963         c.Assert(err, check.IsNil)
964         defer f.Close()
965         _, err = f.Write([]byte{4, 5, 6})
966         c.Check(err, check.IsNil)
967         fi, err := f.Stat()
968         c.Assert(err, check.IsNil)
969         c.Check(fi.Size(), check.Equals, int64(3))
970
971         f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
972         c.Assert(err, check.IsNil)
973         defer f.Close()
974         pos, err := f.Seek(0, io.SeekEnd)
975         c.Check(pos, check.Equals, int64(0))
976         c.Check(err, check.IsNil)
977         fi, err = f.Stat()
978         c.Assert(err, check.IsNil)
979         c.Check(fi.Size(), check.Equals, int64(0))
980         fs.Remove("new")
981
982         buf := make([]byte, 64)
983         f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
984         c.Assert(err, check.IsNil)
985         f.Write([]byte{1, 2, 3})
986         f.Seek(0, io.SeekStart)
987         n, _ = f.Read(buf[:1])
988         c.Check(n, check.Equals, 1)
989         c.Check(buf[:1], check.DeepEquals, []byte{1})
990         pos, err = f.Seek(0, io.SeekCurrent)
991         c.Assert(err, check.IsNil)
992         c.Check(pos, check.Equals, int64(1))
993         f.Write([]byte{4, 5, 6})
994         pos, err = f.Seek(0, io.SeekCurrent)
995         c.Assert(err, check.IsNil)
996         c.Check(pos, check.Equals, int64(6))
997         f.Seek(0, io.SeekStart)
998         n, err = f.Read(buf)
999         c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
1000         c.Check(err, check.Equals, io.EOF)
1001         f.Close()
1002
1003         f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
1004         c.Assert(err, check.IsNil)
1005         pos, err = f.Seek(0, io.SeekCurrent)
1006         c.Check(pos, check.Equals, int64(0))
1007         c.Check(err, check.IsNil)
1008         f.Read(buf[:3])
1009         pos, _ = f.Seek(0, io.SeekCurrent)
1010         c.Check(pos, check.Equals, int64(3))
1011         f.Write([]byte{7, 8, 9})
1012         pos, err = f.Seek(0, io.SeekCurrent)
1013         c.Check(err, check.IsNil)
1014         c.Check(pos, check.Equals, int64(9))
1015         f.Close()
1016
1017         f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
1018         c.Assert(err, check.IsNil)
1019         n, err = f.Write([]byte{3, 2, 1})
1020         c.Check(n, check.Equals, 3)
1021         c.Check(err, check.IsNil)
1022         pos, _ = f.Seek(0, io.SeekCurrent)
1023         c.Check(pos, check.Equals, int64(3))
1024         pos, _ = f.Seek(0, io.SeekStart)
1025         c.Check(pos, check.Equals, int64(0))
1026         n, err = f.Read(buf)
1027         c.Check(n, check.Equals, 0)
1028         c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
1029         f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
1030         c.Assert(err, check.IsNil)
1031         n, _ = f.Read(buf)
1032         c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
1033
1034         f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
1035         c.Check(f, check.IsNil)
1036         c.Check(err, check.NotNil)
1037
1038         f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
1039         c.Check(f, check.IsNil)
1040         c.Check(err, check.ErrorMatches, `invalid flag.*`)
1041 }
1042
1043 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
1044         defer func(cw, mbs int) {
1045                 concurrentWriters = cw
1046                 maxBlockSize = mbs
1047         }(concurrentWriters, maxBlockSize)
1048         concurrentWriters = 2
1049         maxBlockSize = 1024
1050
1051         proceed := make(chan struct{})
1052         var started, concurrent int32
1053         blk2done := false
1054         s.kc.onPut = func([]byte) {
1055                 atomic.AddInt32(&concurrent, 1)
1056                 switch atomic.AddInt32(&started, 1) {
1057                 case 1:
1058                         // Wait until block 2 starts and finishes, and block 3 starts
1059                         select {
1060                         case <-proceed:
1061                                 c.Check(blk2done, check.Equals, true)
1062                         case <-time.After(time.Second):
1063                                 c.Error("timed out")
1064                         }
1065                 case 2:
1066                         time.Sleep(time.Millisecond)
1067                         blk2done = true
1068                 case 3:
1069                         close(proceed)
1070                 default:
1071                         time.Sleep(time.Millisecond)
1072                 }
1073                 c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
1074         }
1075
1076         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1077         c.Assert(err, check.IsNil)
1078         f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
1079         c.Assert(err, check.IsNil)
1080         defer f.Close()
1081
1082         data := make([]byte, 500)
1083         rand.Read(data)
1084
1085         for i := 0; i < 100; i++ {
1086                 n, err := f.Write(data)
1087                 c.Assert(n, check.Equals, len(data))
1088                 c.Assert(err, check.IsNil)
1089         }
1090
1091         currentMemExtents := func() (memExtents []int) {
1092                 for idx, e := range f.(*filehandle).inode.(*filenode).segments {
1093                         switch e.(type) {
1094                         case *memSegment:
1095                                 memExtents = append(memExtents, idx)
1096                         }
1097                 }
1098                 return
1099         }
1100         f.(*filehandle).inode.(*filenode).waitPrune()
1101         c.Check(currentMemExtents(), check.HasLen, 1)
1102
1103         m, err := fs.MarshalManifest(".")
1104         c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
1105         c.Check(err, check.IsNil)
1106         c.Check(currentMemExtents(), check.HasLen, 0)
1107 }
1108
1109 // Ensure blocks get flushed to disk if a lot of data is written to
1110 // small files/directories without calling sync().
1111 //
1112 // Write four 512KiB files into each of 256 top-level dirs (total
1113 // 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
1114 // exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
1115 // 2MiB).
1116 func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
1117         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1118         c.Assert(err, check.IsNil)
1119
1120         s.kc.onPut = func([]byte) {
1121                 // discard flushed data -- otherwise the stub will use
1122                 // unlimited memory
1123                 time.Sleep(time.Millisecond)
1124                 s.kc.Lock()
1125                 defer s.kc.Unlock()
1126                 s.kc.blocks = map[string][]byte{}
1127         }
1128         for i := 0; i < 256; i++ {
1129                 buf := bytes.NewBuffer(make([]byte, 524288))
1130                 fmt.Fprintf(buf, "test file in dir%d", i)
1131
1132                 dir := fmt.Sprintf("dir%d", i)
1133                 fs.Mkdir(dir, 0755)
1134                 for j := 0; j < 2; j++ {
1135                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1136                         c.Assert(err, check.IsNil)
1137                         defer f.Close()
1138                         _, err = io.Copy(f, buf)
1139                         c.Assert(err, check.IsNil)
1140                 }
1141
1142                 if i%8 == 0 {
1143                         fs.Flush("", true)
1144                 }
1145
1146                 size := fs.memorySize()
1147                 if !c.Check(size <= 1<<24, check.Equals, true) {
1148                         c.Logf("at dir%d fs.memorySize()=%d", i, size)
1149                         return
1150                 }
1151         }
1152 }
1153
1154 // Ensure short blocks at the end of a stream don't get flushed by
1155 // Flush(false).
1156 //
1157 // Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
1158 // blocks have been flushed while 8x 3MiB is still buffered in memory.
1159 func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
1160         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1161         c.Assert(err, check.IsNil)
1162
1163         var flushed int64
1164         s.kc.onPut = func(p []byte) {
1165                 atomic.AddInt64(&flushed, int64(len(p)))
1166         }
1167
1168         nDirs := int64(8)
1169         megabyte := make([]byte, 1<<20)
1170         for i := int64(0); i < nDirs; i++ {
1171                 dir := fmt.Sprintf("dir%d", i)
1172                 fs.Mkdir(dir, 0755)
1173                 for j := 0; j < 67; j++ {
1174                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1175                         c.Assert(err, check.IsNil)
1176                         defer f.Close()
1177                         _, err = f.Write(megabyte)
1178                         c.Assert(err, check.IsNil)
1179                 }
1180         }
1181         c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
1182         c.Check(flushed, check.Equals, int64(0))
1183
1184         waitForFlush := func(expectUnflushed, expectFlushed int64) {
1185                 for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
1186                 }
1187                 c.Check(fs.memorySize(), check.Equals, expectUnflushed)
1188                 c.Check(flushed, check.Equals, expectFlushed)
1189         }
1190
1191         // Nothing flushed yet
1192         waitForFlush((nDirs*67)<<20, 0)
1193
1194         // Flushing a non-empty dir "/" is non-recursive and there are
1195         // no top-level files, so this has no effect
1196         fs.Flush("/", false)
1197         waitForFlush((nDirs*67)<<20, 0)
1198
1199         // Flush the full block in dir0
1200         fs.Flush("dir0", false)
1201         waitForFlush((nDirs*67-64)<<20, 64<<20)
1202
1203         err = fs.Flush("dir-does-not-exist", false)
1204         c.Check(err, check.NotNil)
1205
1206         // Flush full blocks in all dirs
1207         fs.Flush("", false)
1208         waitForFlush(nDirs*3<<20, nDirs*64<<20)
1209
1210         // Flush non-full blocks, too
1211         fs.Flush("", true)
1212         waitForFlush(0, nDirs*67<<20)
1213 }
1214
1215 // Even when writing lots of files/dirs from different goroutines, as
1216 // long as Flush(dir,false) is called after writing each file,
1217 // unflushed data should be limited to one full block per
1218 // concurrentWriter, plus one nearly-full block at the end of each
1219 // dir/stream.
1220 func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
1221         nDirs := int64(8)
1222         maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
1223
1224         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1225         c.Assert(err, check.IsNil)
1226
1227         release := make(chan struct{})
1228         timeout := make(chan struct{})
1229         time.AfterFunc(10*time.Second, func() { close(timeout) })
1230         var putCount, concurrency int64
1231         var unflushed int64
1232         s.kc.onPut = func(p []byte) {
1233                 defer atomic.AddInt64(&unflushed, -int64(len(p)))
1234                 cur := atomic.AddInt64(&concurrency, 1)
1235                 defer atomic.AddInt64(&concurrency, -1)
1236                 pc := atomic.AddInt64(&putCount, 1)
1237                 if pc < int64(concurrentWriters) {
1238                         // Block until we reach concurrentWriters, to
1239                         // make sure we're really accepting concurrent
1240                         // writes.
1241                         select {
1242                         case <-release:
1243                         case <-timeout:
1244                                 c.Error("timeout")
1245                         }
1246                 } else if pc == int64(concurrentWriters) {
1247                         // Unblock the first N-1 PUT reqs.
1248                         close(release)
1249                 }
1250                 c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
1251                 c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
1252         }
1253
1254         var owg sync.WaitGroup
1255         megabyte := make([]byte, 1<<20)
1256         for i := int64(0); i < nDirs; i++ {
1257                 dir := fmt.Sprintf("dir%d", i)
1258                 fs.Mkdir(dir, 0755)
1259                 owg.Add(1)
1260                 go func() {
1261                         defer owg.Done()
1262                         defer fs.Flush(dir, true)
1263                         var iwg sync.WaitGroup
1264                         defer iwg.Wait()
1265                         for j := 0; j < 67; j++ {
1266                                 iwg.Add(1)
1267                                 go func(j int) {
1268                                         defer iwg.Done()
1269                                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1270                                         c.Assert(err, check.IsNil)
1271                                         defer f.Close()
1272                                         n, err := f.Write(megabyte)
1273                                         c.Assert(err, check.IsNil)
1274                                         atomic.AddInt64(&unflushed, int64(n))
1275                                         fs.Flush(dir, false)
1276                                 }(j)
1277                         }
1278                 }()
1279         }
1280         owg.Wait()
1281         fs.Flush("", true)
1282 }
1283
1284 func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
1285         done := false
1286         defer func() { done = true }()
1287         time.AfterFunc(10*time.Second, func() {
1288                 if !done {
1289                         pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
1290                         panic("timeout")
1291                 }
1292         })
1293
1294         wrote := 0
1295         s.kc.onPut = func(p []byte) {
1296                 s.kc.Lock()
1297                 s.kc.blocks = map[string][]byte{}
1298                 wrote++
1299                 defer c.Logf("wrote block %d, %d bytes", wrote, len(p))
1300                 s.kc.Unlock()
1301                 time.Sleep(20 * time.Millisecond)
1302         }
1303
1304         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1305         c.Assert(err, check.IsNil)
1306
1307         data := make([]byte, 1<<20)
1308         for i := 0; i < 3; i++ {
1309                 dir := fmt.Sprintf("dir%d", i)
1310                 fs.Mkdir(dir, 0755)
1311                 for j := 0; j < 200; j++ {
1312                         data[0] = byte(j)
1313                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1314                         c.Assert(err, check.IsNil)
1315                         _, err = f.Write(data)
1316                         c.Assert(err, check.IsNil)
1317                         f.Close()
1318                         fs.Flush(dir, false)
1319                 }
1320                 _, err := fs.MarshalManifest(".")
1321                 c.Check(err, check.IsNil)
1322         }
1323 }
1324
1325 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
1326         s.kc.onPut = func([]byte) {
1327                 s.kc.Lock()
1328                 s.kc.blocks = map[string][]byte{}
1329                 s.kc.Unlock()
1330         }
1331         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1332         c.Assert(err, check.IsNil)
1333         for _, blocksize := range []int{8, 1000000} {
1334                 dir := fmt.Sprintf("dir%d", blocksize)
1335                 err = fs.Mkdir(dir, 0755)
1336                 c.Assert(err, check.IsNil)
1337                 data := make([]byte, blocksize)
1338                 for i := 0; i < 100; i++ {
1339                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
1340                         c.Assert(err, check.IsNil)
1341                         _, err = f.Write(data)
1342                         c.Assert(err, check.IsNil)
1343                         f.Close()
1344                         fs.Flush(dir, false)
1345                 }
1346                 fs.Flush(dir, true)
1347                 _, err := fs.MarshalManifest(".")
1348                 c.Check(err, check.IsNil)
1349         }
1350 }
1351
1352 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
1353         for _, txt := range []string{
1354                 "\n",
1355                 ".\n",
1356                 ". \n",
1357                 ". d41d8cd98f00b204e9800998ecf8427e+0\n",
1358                 ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
1359                 ". 0:0:foo\n",
1360                 ".  0:0:foo\n",
1361                 ". 0:0:foo 0:0:bar\n",
1362                 ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
1363                 ". d41d8cd98f00b204e9800998ecf8427e+0 :0:0:foo\n",
1364                 ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
1365                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
1366                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
1367                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
1368                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
1369                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
1370                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
1371                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
1372                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
1373                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
1374                 "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
1375         } {
1376                 c.Logf("<-%q", txt)
1377                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1378                 c.Check(fs, check.IsNil)
1379                 c.Logf("-> %s", err)
1380                 c.Check(err, check.NotNil)
1381         }
1382 }
1383
1384 func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
1385         for _, txt := range []string{
1386                 "",
1387                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
1388                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
1389                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
1390                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
1391                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
1392                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
1393         } {
1394                 c.Logf("<-%q", txt)
1395                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1396                 c.Check(err, check.IsNil)
1397                 c.Check(fs, check.NotNil)
1398         }
1399 }
1400
1401 func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
1402         fn := f.(*filehandle).inode.(*filenode)
1403         var memsize int64
1404         for _, seg := range fn.segments {
1405                 if e, ok := seg.(*memSegment); ok {
1406                         memsize += int64(len(e.buf))
1407                 }
1408         }
1409         c.Check(fn.memsize, check.Equals, memsize)
1410 }
1411
1412 type CollectionFSUnitSuite struct{}
1413
1414 var _ = check.Suite(&CollectionFSUnitSuite{})
1415
1416 // expect ~2 seconds to load a manifest with 256K files
1417 func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
1418         if testing.Short() {
1419                 c.Skip("slow")
1420         }
1421
1422         const (
1423                 dirCount  = 512
1424                 fileCount = 512
1425         )
1426
1427         mb := bytes.NewBuffer(make([]byte, 0, 40000000))
1428         for i := 0; i < dirCount; i++ {
1429                 fmt.Fprintf(mb, "./dir%d", i)
1430                 for j := 0; j <= fileCount; j++ {
1431                         fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
1432                 }
1433                 for j := 0; j < fileCount; j++ {
1434                         fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
1435                 }
1436                 mb.Write([]byte{'\n'})
1437         }
1438         coll := Collection{ManifestText: mb.String()}
1439         c.Logf("%s built", time.Now())
1440
1441         var memstats runtime.MemStats
1442         runtime.ReadMemStats(&memstats)
1443         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1444
1445         f, err := coll.FileSystem(nil, nil)
1446         c.Check(err, check.IsNil)
1447         c.Logf("%s loaded", time.Now())
1448         c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
1449
1450         for i := 0; i < dirCount; i++ {
1451                 for j := 0; j < fileCount; j++ {
1452                         f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
1453                 }
1454         }
1455         c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
1456
1457         runtime.ReadMemStats(&memstats)
1458         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
1459 }
1460
1461 // Gocheck boilerplate
1462 func Test(t *testing.T) {
1463         check.TestingT(t)
1464 }